5.4.2. Install Kafka Applier

Installation of the Kafka replication requires special configuration of the master and slave hosts so that each is configured for the correct datasource type.

  1. Before installing the applier, the following addition needs adding to the extractor configuration. Apply the following parameters on the extractor host, update the extractor using the details below, and then install the applier

    • For Staging Install:

      shell> cd tungsten-replicator-5.4.0-48
      shell> ./tools/tpm configure alpha \
        --repl-svc-extractor-filters=colnames,pkey \
        --property=replicator.filter.pkey.addColumnsToDeletes=true \
        --property=replicator.filter.pkey.addPkeyToInserts=true
      shell> ./tools/tpm update
    • For INI Installs: Add the following the /etc/tungsten/tungsten.ini

      
      [alpha]
      ...Existing Replicator Config...
      repl-svc-extractor-filters=colnames,pkey
      property=replicator.filter.pkey.addColumnsToDeletes=true
      property=replicator.filter.pkey.addPkeyToInserts=true
      
      
      shell> tpm update
  2. Unpack the Tungsten Replicator distribution in staging directory:

    shell> tar zxf tungsten-replicator-5.4.0-48.tar.gz
  3. Change into the staging directory:

    shell> cd tungsten-replicator-5.4.0-48
  4. Configure the installation using tpm:

    Show Staging

    Show INI

    shell> ./tools/tpm configure defaults \
        --reset \
        --install-directory=/opt/continuent \
        --profile-script=~/.bash_profile
    
    shell> ./tools/tpm configure alpha \
        --master=sourcehost \
        --members=localhost \
        --datasource-type=kafka \
        --replication-user=root \
        --replication-password=null \
        --replication-port=9092 \
        --property=replicator.applier.dbms.zookeeperString=localhost:2181 \
        --property=replicator.applier.dbms.requireacks=1
    
    shell> vi /etc/tungsten/tungsten.ini
    [defaults]
    install-directory=/opt/continuent
    profile-script=~/.bash_profile
    
    [alpha]
    master=sourcehost
    members=localhost
    datasource-type=kafka
    replication-user=root
    replication-password=null
    replication-port=9092
    property=replicator.applier.dbms.zookeeperString=localhost:2181
    property=replicator.applier.dbms.requireacks=1
    

    Configuration group defaults

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

    Configuration group alpha

    The description of each of the options is shown below; click the icon to hide this detail:

    Click the icon to show a detailed description of each argument.

  5. If your MySQL source is a Tungsten Cluster, ensure the additional steps below are also included in your applier configuration

    First, prepare the required filter configuration file as follows on the Kafka applier slave host(s) only:

    shell> mkdir -p /opt/continuent/share/
    shell> cp tungsten-replicator/support/filters-config/convertstringfrommysql.json /opt/continuent/share/

    Then, include the following parameters in the configuration

    property=replicator.stage.remote-to-thl.filters=convertstringfrommysql
    property=replicator.filter.convertstringfrommysql.definitionsFile=/opt/continuent/share/convertstringfrommysql.json
    
  6. Once the prerequisites and configuring of the installation has been completed, the software can be installed:

    shell> ./tools/tpm install

If you encounter problems during the installation, check the output of the /tmp/tungsten-configure.log file for more information about the root cause.

Once the service is configured and running, the service can be monitored as normal using the trepctl command. See Section 5.4.3, “Management and Monitoring of Kafka Deployments” for more information.

5.4.2.1. Optional Configuration Parameters for Kafka

A number of optional, configurable, properties are available that control how Tungsten Replicator applies and populates information when the data is written into Kafka. The following properties can by set during configuration using --property=PROPERTYNAME=value:

Table 5.3. Optional Kafka Applier Properties

OptionDescription
replicator.applier.dbms.embedCommitTimeSets whether the commit time for the source row is embedded into the document
replicator.applier.dbms.embedSchemaTableEmbed the source schema name and table name in the stored document
replicator.applier.dbms.keyFormatDetermines the format of the message ID
replicator.applier.dbms.requireacksDefines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required
replicator.applier.dbms.retrycountThe number of retries for sending each message
replicator.applier.dbms.zookeeperStringConnection string for Zookeeper, including hostname and port

replicator.applier.dbms.embedCommitTime

Optionreplicator.applier.dbms.embedCommitTime
DescriptionSets whether the commit time for the source row is embedded into the document
Value Typeboolean
Defaulttrue
Valid ValuesfalseDo not embed the source database commit time
 trueEmbed the source database commit time into the stored document

Embeds the commit time of the source database row into the document information:

{
 "_seqno" : "4865",
 "_source_table" : "msg",
 "_committime" : "2017-07-13 15:30:37.0",
 "_source_schema" : "test",
 "record" : {
 "msg" : "Hello Kafka",
 "id" : "2384726"
 },
 "_optype" : "INSERT"
}

replicator.applier.dbms.embedSchemaTable

Optionreplicator.applier.dbms.embedSchemaTable
DescriptionEmbed the source schema name and table name in the stored document
Value Typeboolean
Defaulttrue
Valid ValuesfalseDo not embed the schema or database name in the document
 trueEmbed the source schema name and database name into the stored document

If enabled, the documented stored into Elasticsearch will include the source schema and database name. This can be used to identify the source of the information if the schema and table name is not being used for the index and type names (see replicator.applier.dbms.useSchemaAsIndex and replicator.applier.dbms.useTableAsType).

{
 "_seqno" : "4865",
 "_source_table" : "msg",
 "_committime" : "2017-07-13 15:30:37.0",
 "_source_schema" : "test",
 "record" : {
 "msg" : "Hello Kafka",
 "id" : "2384726"
 },
 "_optype" : "INSERT"
}

replicator.applier.dbms.keyFormat

Optionreplicator.applier.dbms.keyFormat
DescriptionDetermines the format of the message ID
Value Typestring
Defaultpkey
Valid ValuespkeyCombine the primary key column values into a single string
 pkeyusCombine the primary key column values into a single string joined by an underscore character
 tspkeyCombine the schema name, table name, and primary key column values into a single string joined by an underscore character
 tspkeyusCombine the schema name, table name, and primary key column values into a single string

Determines the format of the message ID used when sending the message into Kafka. For example, when configured to use tspkeyus, then the format of the message ID will consist of the schemaname, table name and primary key column information separated by underscores, SCHEMANAME_TABLENAME_234.

replicator.applier.dbms.requireacks

Optionreplicator.applier.dbms.requireacks
DescriptionDefines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required
Value Typestring
Defaultall
Valid Values1Only the lead host should acknowledge receipt of the message
 allAll nodes should acknowledge receipt of the message

Sets the acknowledgement counter for sending messages into the Kafka queue.

replicator.applier.dbms.retrycount

Optionreplicator.applier.dbms.retrycount
DescriptionThe number of retries for sending each message
Value Typenumber
Default0

Determines the number of times the message will attempt to be sent before failure.

replicator.applier.dbms.zookeeperString

Optionreplicator.applier.dbms.zookeeperString
DescriptionConnection string for Zookeeper, including hostname and port
Value Typestring
Default${replicator.global.db.host}:2181

The string to be used when connecting to Zookeeper. The default is to use port 2181 on the host used by replicator.global.db.host.