Skip to main content
Tungsten Replicator

Install Kafka Applier

Installation of the Kafka replication requires special configuration of the Extractor and Applier hosts so that each is configured for the correct datasource type.

  1. Before installing the applier, the following additions need adding to the extractor configuration. Apply the following parameter to the extractor configuration before installing the applier

    Add the following to /etc/tungsten/tungsten.ini

    [alpha]
    ...Existing Replicator Config...
    enable-heterogeneous-service=true

    shell> tpm update
    Note

    The above step is only applicable for standalone extractors. If you are configuring replications from an existing Tungsten Cluster (Cluster-Extractor), follow the steps outlined here to ensure the cluster is configured correctly: "Replicating Data Out of a Cluster"

  2. Unpack the Tungsten Replicator distribution in staging directory:

    shell> tar zxf tungsten-replicator-8.0.4-132.tar.gz
  3. Change into the staging directory:

    shell> cd tungsten-replicator-8.0.4-132
  4. Create the /etc/tungsten/tungsten.ini using the example below as a template.

    Example tungsten.ini
    [defaults]
    install-directory=/opt/continuent
    profile-script=~/.bash_profile
    rest-api-admin-user=apiuser
    rest-api-admin-password=secret
    replicator-rest-api-address=0.0.0.0

    [alpha]
    master=sourcehost
    members=localhost
    datasource-type=kafka
    replication-user=root
    replication-password=null
    replication-port=9092
    property=replicator.applier.dbms.zookeeperString=localhost:2181
    property=replicator.applier.dbms.requireacks=1
    Show argument definitions
    profile-script=~/.bash_profileAppend commands to include env.sh in this profile script.
    rest-api-admin-user=apiuserSpecify the initial Admin Username for API access.Available from v7.0.0
    rest-api-admin-password=secretSpecify the initial Admin User Password for API access. rest-api-admin-password alias only available from version 7.1.2 onwards.Available from v7.0.0
    replicator-rest-api-address=0.0.0.0Address for the API to bind too.Available from v7.0.0
  5. If your MySQL source is a Tungsten Cluster, ensure the additional steps below are also included in your applier configuration

    First, prepare the required filter configuration file as follows on the Kafka applier host(s) only:

    shell> mkdir -p /opt/continuent/share/
    shell> cp tungsten-replicator/support/filters-config/convertstringfrommysql.json /opt/continuent/share/

    Then, include the following parameters in the configuration

    property=replicator.stage.remote-to-thl.filters=convertstringfrommysql
    property=replicator.filter.convertstringfrommysql.definitionsFile=/opt/continuent/share/convertstringfrommysql.json
  6. Once the prerequisites and configuring of the installation has been completed, the software can be installed:

    shell> ./tools/tpm install

If you encounter problems during the installation, check the output of the /tmp/tungsten-configure.log file for more information about the root cause.

Once the service is configured and running, the service can be monitored as normal using the trepctl command. See "Management and Monitoring of Kafka Deployments" for more information.

Optional Configuration Parameters for Kafka

A number of optional, configurable, properties are available that control how Tungsten Replicator applies and populates information when the data is written into Kafka. The following properties can be set during configuration using --property=PROPERTYNAME=value:

OptionDescription
replicator.applier.dbms.embedCommitTime.kafkaSets whether the commit time for the source row is embedded into the message.
replicator.applier.dbms.embedSchemaTableEmbed the source schema name and table name in the stored document
replicator.applier.dbms.enabletxinfo.kafkaEmbeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message
replicator.applier.dbms.enabletxninfoTopicEmbeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event.
replicator.applier.dbms.keyFormatDetermines the format of the message ID.
replicator.applier.dbms.requireacksSets the acknowledgement counter for sending messages into the Kafka queue.
replicator.applier.dbms.retrycountThe number of retries for sending each message.
replicator.applier.dbms.txninfoTopicSets the topic name for transaction messages.
replicator.applier.dbms.zookeeperStringConnection string for Zookeeper, including hostname and port

replicator.applier.dbms.embedCommitTime.kafka

Defaulttrue
Valid valuestruefalse
ProductTR
Embeds the commit time of the source database row into the message:
{
"_seqno" : "4865",
"_source_table" : "msg",
"_committime" : "2017-07-13 15:30:37.0",
"_source_schema" : "test",
"record" : {
"msg" : "Hello Kafka",
"id" : "2384726"
},
"_optype" : "INSERT"
}

replicator.applier.dbms.embedSchemaTable

Defaulttrue
Valid valuestruefalse
ProductTR
If enabled, the kafka message will include the source schema and database name. This can be used to identify the source of the information if the schema and table name is not being used for the index and type names (see replicator.applier.dbms.useSchemaAsIndex andreplicator.applier.dbms.useTableAsType).
{
"_seqno" : "4865",
"_source_table" : "msg",
"_committime" : "2017-07-13 15:30:37.0",
"_source_schema" : "test",
"record" : {
"msg" : "Hello Kafka",
"id" : "2384726"
},
"_optype" : "INSERT"
}

replicator.applier.dbms.enabletxinfo.kafka

Defaultfalse
Valid valuestruefalse
ProductTR
Embeds information about the entire transaction information using the data provided by the rowaddtxninfo filter and other information embedded in each THL event into each message sent. The transaction information includes information about the entire transaction (row counts, event ID and tables modified) into each message. Since one message is normally sent for each row of data, by adding the information about the full transaction into the message it's possible to validate and identify what other messages may be part of a single transaction when the messages are being re-assembled by a Kafka client. For example, when looking at a single message in Kafka, the message includes a txninfo section:
{
"_source_table" : "msg",
"_committime" : "2018-03-07 12:53:21.0",
"record" : {
"msg2" : "txinfo",
"id" : "109",
"msg" : "txinfo"
},
"_optype" : "INSERT",
"_seqno" : "164",
"txnInfo" : {
"schema" : [
{
"schemaName" : "msg",
"rowCount" : "1",
"tableName" : "msg"
},
{
"rowCount" : "2",
"schemaName" : "msg",
"tableName" : "msgsub"
}
],
"serviceName" : "alpha",
"totalCount" : "3",
"tungstenTransId" : "164",
"firstRecordInTransaction" : "true"
},
"_source_schema" : "msg"
}
This block of the overall message includes the following objects and information:
  • schema - An array of the row counts within this transaction, with a row count included for each schema and table.
  • serviceName - The name of the Tungsten Replicator service that generated the message.
  • totalCount - The total number of rows modified within the entire transaction.
  • firstRecordInTransaction - If this field exists, it should always be set to true and indicats that this message was generated by the first row inserted, updated or deleted in the overall transaction. This effectively indicates the start of the overall transaction.
  • lastRecordInTransaction - If this field exists, it should always be set to true and indicates that this message was generated by the last row inserted, updated or deleted in the overall transaction. This effectively indicates the end of the overall transaction
Note
Note that this information block is included in every message for each row within an overall transaction. The firstRecordInTransaction and lastRecordInTransaction can be used to identify the start and end of the transaction overall.

replicator.applier.dbms.enabletxninfoTopic

Defaultfalse
Valid valuestruefalse
ProductTR
If enabled, it sends a separate message on a Kafka topic containing information about the entire tranaction. The topic name can be configured by setting the replicator.applier.dbms.txninfoTopic property. The default message sent will look like the following example:
{
"txnInfo" : {
"tungstenTransId" : "164",
"schema" : [
{
"schemaName" : "msg",
"rowCount" : "1",
"tableName" : "msg"
},
{
"schemaName" : "msg",
"rowCount" : "2",
"tableName" : "msgsub"
}
],
"totalCount" : "3",
"serviceName" : "alpha"
}
}
This block of the overall message includes the following objects and information:
  • schema - An array of the row counts within this transaction, with a row count included for each schema and table.
  • serviceName - The name of the Tungsten Replicator service that generated the message.
  • totalCount - The total number of rows modified within the entire transaction.

replicator.applier.dbms.keyFormat

Defaultpkey
Valid valuespkeypkeyustspkeyustspkey
ProductTR
Determines the format of the message ID used when sending the message into Kafka. For example, when configured to use tspkeyus, then the format of the message ID will consist of the schemaname, table name and primary key column information separated by underscores, for example: SCHEMANAME_TABLENAME_234.
  • pkey - Combine the primary key column values into a single string
  • pkeyus - Combine the primary key column values into a single string joined by an underscore character
  • tspkeyus - Combine the schema name, table name, and primary key column values into a single string
  • tspkey - Combine the schema name, table name, and primary key column values into a single string joined by an underscore character

replicator.applier.dbms.requireacks

Defaultall
Valid valuesall1
ProductTR
Defines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required.

replicator.applier.dbms.retrycount

Default0
Valid values
ProductTR
Determines the number of times the message will attempt to be sent before failure.

replicator.applier.dbms.txninfoTopic

Defaulttungsten_transactions
ProductTR
Sets the topic name to be used when sending independent transaction information messagesa about each THL event. Seereplicator.applier.dbms.addtxninfo.

replicator.applier.dbms.zookeeperString

Default${replicator.global.db.host}:2181
ProductTR
Connection string for Zookeeper, including hostname and port