6.10.2. Installing Kafka Replication

Installation of the Kafka replication requires special configuration of the master and slave hosts so that each is configured for the correct datasource type.

6.10.2.1. Installing Kafka Replication (Staging Method)

Configuration of the replication deployment to Kafka can be made using a single tpm staging-based deployment. However, because the configuration must be different for the master and slave hosts, the configuration must be performed in multiple steps.

  1. Unpack the Tungsten Replicator distribution in staging directory:

    shell> tar zxf tungsten-replicator-6.0.3-599.tar.gz
  2. Change into the staging directory:

    shell> cd tungsten-replicator-6.0.3-599

    Prepare the required filter configuration file as follows on the Kafka applier slave host(s) only:

    shell> mkdir -p /opt/replicator/share/
    shell> cp tungsten-replicator/support/filters-config/convertstringfrommysql.json /opt/replicator/share/
  3. Configure the default parameters for the replicator service:

    shell> ./tools/tpm configure defaults \
    	--install-directory=/opt/replicator \
    	--disable-relay-logs=true \
    	--mysql-allow-intensive-checks \
    	"--profile-script=~/.bash_profile" \
    	--skip-validation-check=HostsFileCheck \
    	--skip-validation-check=MySQLUnsupportedDataTypesCheck \
    	--start-and-report \
    	--user=tungsten
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  4. Configure the parameters for the master replicator service that will be installed on host mysqlhost:

    shell> ./tools/tpm configure alpha \
    	--hosts=mysqlhost \
    	--topology=master-slave \
    	--master=mysqlhost \
    	--members=mysqlhost \
    	--replication-user=tungsten \
    	--replication-password=secret \
    	--replication-port=3306 \
    	--enable-heterogeneous-service=true
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

    If your MySQL source is a Cluster, add the following to your current cluster configuration instead:

    shell> ./tools/tpm configure alpha \
    	--repl-svc-extractor-filters=colnames,pkey \
    	--property=replicator.filter.pkey.addColumnsToDeletes=true \
    	--property=replicator.filter.pkey.addPkeyToInserts=true
    
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  5. Configure the parameters for the slave replicator service that will be installed on host kafka:

    shell> ./tools/tpm configure alpha \
    --datasource-type=kafka \
    --install-directory=/opt/continuent \
    --master=ubuntuheterosrc \
    --members=kafka \
    --replication-host=localhost \
    --replication-password=root \
    --replication-port=9092 \
    --replication-user=root
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

    If your MySQL source is a Cluster, use this configuration instead:

    shell> ./tools/tpm configure beta \
    	--hosts=kafka \
    	--topology=cluster-slave \
    	--relay-source=alpha \
    	--relay=kafka \
    	--property=replicator.stage.remote-to-thl.filters=convertstringfrommysql \
    	--property=replicator.filter.convsrtstringfrommysql.definitionsFile=/opt/replicator/share/convertstringfrommysql.json \
      --datasource-type=kafka \
      --install-directory=/opt/continuent \
      --master=ubuntuheterosrc \
      --members=kafka \
      --replication-host=localhost \
      --replication-password=root \
      --replication-port=9092 \
      --replication-user=root
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  6. Install the services:

    shell> ./tools/tpm install

    If your MySQL source is an existing Cluster, update the configuration instead:

    shell> ./tools/tpm update

If you encounter problems during the installation, check the output of the /tmp/tungsten-configure.log file for more information about the root cause.

Once the service is configured and running, the service can be monitored as normal using the trepctl command. See Section 6.10.3, “Management and Monitoring of Kafka Deployments” for more information.

6.10.2.2. Installing Kafka Replication (INI Method)

INI configuration of the replication deployment to Kafka is done by specifying a different INI configuration for the master and slave hosts.

  1. Configure the ini parameters on the host that will run the master replicator service:

    
    [defaults]
    install-directory=/opt/replicator
    disable-relay-logs=true
    mysql-allow-intensive-checks=true
    profile-script=~/.bash_profile
    skip-validation-check=HostsFileCheck
    skip-validation-check=MySQLUnsupportedDataTypesCheck
    start-and-report=true
    user=tungsten
    
    [alpha]
    topology=master-slave
    master=mysqlhost
    members=mysqlhost,kafka
    replication-user=tungsten
    replication-password=secret
    replication-port=3306
    enable-heterogeneous-master=true
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

    If your MySQL source is a Cluster, add the following to your current cluster configuration instead:

    [alpha]
    ...existing cluster configs here...
    repl-svc-extractor-filters=colnames,pkey
    property=replicator.filter.pkey.addColumnsToDeletes=true
    property=replicator.filter.pkey.addPkeyToInserts=true
    
    

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  2. Configure the ini parameters on the slave host that will apply the events to Kafka:

    
    [defaults]
    install-directory=/opt/replicator
    disable-relay-logs=true
    mysql-allow-intensive-checks=true
    profile-script=~/.bash_profile
    skip-validation-check=HostsFileCheck
    skip-validation-check=MySQLUnsupportedDataTypesCheck
    start-and-report=true
    user=tungsten
    
    [alpha]
    topology=master-slave
    master=mysqlhost
    members=mysqlhost,kafka
    datasource-type=kafka
    replication-host=kafka
    replication-password=null
    replication-port=9092
    replication-user=root
    

    The above configures the Kafka slave to accept replication data from the master.

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

    If your MySQL source is a Cluster, use this configuration instead:

    
    [defaults]
    disable-relay-logs=true
    disable-security-controls=true
    install-directory=/opt/replicator
    mysql-allow-intensive-checks=true
    profile-script=~/.bash_profile
    skip-validation-check=HostsFileCheck
    skip-validation-check=MySQLUnsupportedDataTypesCheck
    start-and-report=true
    user=tungsten
    
    [alpha]
    ...existing cluster configs here...
    topology=clustered
    
    [beta]
    topology=cluster-slave
    relay=kafka
    relay-source=alpha
    datasource-type=kafka
    property=replicator.stage.remote-to-thl.filters=convertstringfrommysql
    property=replicator.filter.convertstringfrommysql.definitionsFile=/opt/replicator/share/convertstringfrommysql.json
    replication-host=kafka
    replication-password=null
    replication-port=9092
    replication-user=root
    
    

    The above configures the Kafka slave to accept replication data from the cluster nodes.

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  3. Using the INI method requires that the following steps be performed on EVERY node:

    1. Unpack the Tungsten Replicator distribution in the staging directory:

      shell> tar zxf tungsten-replicator-6.0.3-599.tar.gz
    2. Change into the staging directory:

      shell> cd tungsten-replicator-6.0.3-599
    3. Install the services:

      shell> ./tools/tpm install

If you encounter problems during the installation, check the output of the /tmp/tungsten-configure.log file for more information about the root cause.

Once the service is configured and running, the service can be monitored as normal using the trepctl command. See Section 6.10.3, “Management and Monitoring of Kafka Deployments” for more information.

6.10.2.3. Optional Configuration Parameters for Kafka

A number of optional, configurable, properties are available that control how Tungsten Replicator applies and populates information when the data is written into Kafka. The following properties can by set during configuration using --property=PROPERTYNAME=value:

Table 6.4. Optional Kafka Applier Properties

OptionDescription
replicator.applier.dbms.embedCommitTimeSets whether the commit time for the source row is embedded into the document
replicator.applier.dbms.embedSchemaTableEmbed the source schema name and table name in the stored document
replicator.applier.dbms.enabletxinfo.kafkaEmbeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message
replicator.applier.dbms.enabletxninfoTopicEmbeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event.
replicator.applier.dbms.keyFormatDetermines the format of the message ID
replicator.applier.dbms.requireacksDefines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required
replicator.applier.dbms.retrycountThe number of retries for sending each message
replicator.applier.dbms.txninfoTopicSets the topic name for transaction messages
replicator.applier.dbms.zookeeperStringConnection string for Zookeeper, including hostname and port

replicator.applier.dbms.embedCommitTime>

Optionreplicator.applier.dbms.embedCommitTime
DescriptionSets whether the commit time for the source row is embedded into the document
Value Typeboolean
Defaulttrue
Valid ValuesfalseDo not embed the source database commit time
 trueEmbed the source database commit time into the stored document

Embeds the commit time of the source database row into the document information:

{
 "_seqno" : "4865",
 "_source_table" : "msg",
 "_committime" : "2017-07-13 15:30:37.0",
 "_source_schema" : "test",
 "record" : {
 "msg" : "Hello Kafka",
 "id" : "2384726"
 },
 "_optype" : "INSERT"
}

replicator.applier.dbms.embedSchemaTable>

Optionreplicator.applier.dbms.embedSchemaTable
DescriptionEmbed the source schema name and table name in the stored document
Value Typeboolean
Defaulttrue
Valid ValuesfalseDo not embed the schema or database name in the document
 trueEmbed the source schema name and database name into the stored document

If enabled, the documented stored into Elasticsearch will include the source schema and database name. This can be used to identify the source of the information if the schema and table name is not being used for the index and type names (see replicator.applier.dbms.useSchemaAsIndex and replicator.applier.dbms.useTableAsType).

{
 "_seqno" : "4865",
 "_source_table" : "msg",
 "_committime" : "2017-07-13 15:30:37.0",
 "_source_schema" : "test",
 "record" : {
 "msg" : "Hello Kafka",
 "id" : "2384726"
 },
 "_optype" : "INSERT"
}

replicator.applier.dbms.enabletxinfo.kafka>

Optionreplicator.applier.dbms.enabletxinfo.kafka
DescriptionEmbeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message
Value Typeboolean
Defaultfalse
Valid ValuesfalseDo not include transaction information in each
 trueEmbed transaction information into each Kafka message

Embeds information about the entire transaction information using the data provided by the rowaddtxninfo filter and other information embedded in each THL event into each message sent. The transaction information includes information about the entire transaction (row counts, event ID and tables modified) into each message. Since one message is normally sent for each row of data, by adding the information about the full transaction into the message it's possible to validate and identify what other messages may be part of a single transaction when the messages are being re-assembled by a Kafka client.

For example, when looking at a single message in Kafka, the message includes a txninfo section:

{
 "_source_table" : "msg",
 "_committime" : "2018-03-07 12:53:21.0",
 "record" : {
 "msg2" : "txinfo",
 "id" : "109",
 "msg" : "txinfo"
 },
 "_optype" : "INSERT",
 "_seqno" : "164",
 "txnInfo" : {
 "schema" : [
 {
 "schemaName" : "msg",
 "rowCount" : "1",
 "tableName" : "msg"
 },
 {
 "rowCount" : "2",
 "schemaName" : "msg",
 "tableName" : "msgsub"
 }
 ],
 "serviceName" : "alpha",
 "totalCount" : "3",
 "tungstenTransId" : "164",
 "firstRecordInTransaction" : "true"
 },
 "_source_schema" : "msg"
}

This block of the overall message includes the following objects and information:

  • schema

    An array of the row counts within this transaction, with a row count included for each schema and table.

  • serviceName

    The name of the Tungsten Replicator service that generated the message.

  • totalCount

    The total number of rows modified within the entire transaction.

  • firstRecordInTransaction

    If this field exists, it should always be set to true and indicats that this message was generated by the first row inserted, updated or deleted in the overall transaction. This effectively indicates the start of the overall transaction.

  • lastRecordInTransaction

    If this field exists, it should always be set to true and indicats that this message was generated by the last row inserted, updated or deleted in the overall transaction. This effectively indicates the end of the overall transaction

Note that this information block is included in every message for each row within an overall transaction. The firstRecordInTransaction and lastRecordInTransaction can be used to identify the start and end of the transaction overall.

replicator.applier.dbms.enabletxninfoTopic>

Optionreplicator.applier.dbms.enabletxninfoTopic
DescriptionEmbeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event.
Value Typeboolean
Defaultfalse
Valid ValuesfalseDo not generate transaction information
 trueSend transaction information on a separate Kafka topic for each transaction

If enabled, it sends a separate message on a Kafka topic containing information about the entire tranaction. The topic name can be configured by setting the replicator.applier.dbms.txninfoTopic property.

The default message sent will look like the following example:

{
 "txnInfo" : {
 "tungstenTransId" : "164",
 "schema" : [
 {
 "schemaName" : "msg",
 "rowCount" : "1",
 "tableName" : "msg"
 },
 {
 "schemaName" : "msg",
 "rowCount" : "2",
 "tableName" : "msgsub"
 }
 ],
 "totalCount" : "3",
 "serviceName" : "alpha"
 }
}

This block of the overall message includes the following objects and information:

  • schema

    An array of the row counts within this transaction, with a row count included for each schema and table.

  • serviceName

    The name of the Tungsten Replicator service that generated the message.

  • totalCount

    The total number of rows modified within the entire transaction.

replicator.applier.dbms.keyFormat>

Optionreplicator.applier.dbms.keyFormat
DescriptionDetermines the format of the message ID
Value Typestring
Defaultpkey
Valid ValuespkeyCombine the primary key column values into a single string
 pkeyusCombine the primary key column values into a single string joined by an underscore character
 tspkeyCombine the schema name, table name, and primary key column values into a single string joined by an underscore character
 tspkeyusCombine the schema name, table name, and primary key column values into a single string

Determines the format of the message ID used when sending the message into Kafka. For example, when configured to use tspkeyus, then the format of the message ID will consist of the schemaname, table name and primary key column information separated by underscores, SCHEMANAME_TABLENAME_234.

replicator.applier.dbms.requireacks>

Optionreplicator.applier.dbms.requireacks
DescriptionDefines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required
Value Typestring
Defaultall
Valid Values1Only the lead host should acknowledge receipt of the message
 allAll nodes should acknowledge receipt of the message

Sets the acknowledgement counter for sending messages into the Kafka queue.

replicator.applier.dbms.retrycount>

Optionreplicator.applier.dbms.retrycount
DescriptionThe number of retries for sending each message
Value Typenumber
Default0

Determines the number of times the message will attempt to be sent before failure.

replicator.applier.dbms.txninfoTopic>

Optionreplicator.applier.dbms.txninfoTopic
DescriptionSets the topic name for transaction messages
Value Typestring
Defaulttungsten_transactions

Sets the topic name to be used when sending independent transaction information messagesa about each THL event. See replicator.applier.dbms.addtxninfo.

replicator.applier.dbms.zookeeperString>

Optionreplicator.applier.dbms.zookeeperString
DescriptionConnection string for Zookeeper, including hostname and port
Value Typestring
Default${replicator.global.db.host}:2181

The string to be used when connecting to Zookeeper. The default is to use port 2181 on the host used by replicator.global.db.host.