Installation of the Kafka replication requires special configuration of the Extractor and Applier hosts so that each is configured for the correct datasource type.
Before installing the applier, the following additions need adding to the extractor configuration. Apply the following parameter to the extractor configuration before installing the applier
Add the following the /etc/tungsten/tungsten.ini
[alpha] ...Existing Replicator Config... enable-heterogeneous-service=true
shell>tpm update
The above step is only applicable for standalone extractors. If you are configuring replications from an existing Tungsten Cluster (Cluster-Extractor), follow the steps outlined here to ensure the cluster is configured correctly: Section 3.4.1, “Prepare: Replicating Data Out of a Cluster”
Unpack the Tungsten Replicator distribution in staging directory:
shell> tar zxf tungsten-replicator-7.0.3-141.tar.gz
Change into the staging directory:
shell> cd tungsten-replicator-7.0.3-141
Configure the installation using tpm:
shell>./tools/tpm configure defaults \ --reset \ --install-directory=/opt/continuent \ --profile-script=~/.bash_profile \ --rest-api-admin-user=apiuser \ --rest-api-admin-pass=secret
shell>./tools/tpm configure alpha \ --master=sourcehost \ --members=localhost \ --datasource-type=kafka \ --replication-user=root \ --replication-password=null \ --replication-port=9092 \ --property=replicator.applier.dbms.zookeeperString=localhost:2181 \ --property=replicator.applier.dbms.requireacks=1
shell> vi /etc/tungsten/tungsten.ini
[defaults] install-directory=/opt/continuent profile-script=~/.bash_profile rest-api-admin-user=apiuser rest-api-admin-pass=secret
[alpha] master=sourcehost members=localhost datasource-type=kafka replication-user=root replication-password=null replication-port=9092 property=replicator.applier.dbms.zookeeperString=localhost:2181 property=replicator.applier.dbms.requireacks=1
Configuration group defaults
The description of each of the options is shown below; click the icon to hide this detail:
For staging configurations, deletes all pre-existing configuration information between updating with the new configuration values.
--install-directory=/opt/continuent
install-directory=/opt/continuent
Path to the directory where the active deployment will be installed. The configured directory will contain the software, THL and relay log information unless configured otherwise.
--profile-script=~/.bash_profile
profile-script=~/.bash_profile
Append commands to include env.sh in this profile script
Configuration group alpha
The description of each of the options is shown below; click the icon to hide this detail:
The hostname of the primary (extractor) within the current service.
Hostnames for the dataservice members
Database type
For databases that required authentication, the username to use when connecting to the database using the corresponding connection method (native, JDBC, etc.).
The password to be used when connecting to the database using
the corresponding
--replication-user
.
The network port used to connect to the database server. The default port used depends on the database being configured.
If your MySQL source is a Tungsten Cluster, ensure the additional steps below are also included in your applier configuration
First, prepare the required filter configuration file as follows on the Kafka applier host(s) only:
shell>mkdir -p /opt/continuent/share/
shell>cp tungsten-replicator/support/filters-config/convertstringfrommysql.json /opt/continuent/share/
Then, include the following parameters in the configuration
property=replicator.stage.remote-to-thl.filters=convertstringfrommysql
property=replicator.filter.convertstringfrommysql.definitionsFile=/opt/continuent/share/convertstringfrommysql.json
If you plan to make full use of the REST API (which is enabled by default) you will need to also configure a username and password for API access. This must be done by specifying the following options in your configuration:
rest-api-admin-user=tungsten rest-api-admin-pass=secret
Once the prerequisites and configuring of the installation has been completed, the software can be installed:
shell> ./tools/tpm install
If you encounter problems during the installation, check the output of
the /tmp/tungsten-configure.log
file for more information about the root cause.
Once the service is configured and running, the service can be monitored as normal using the trepctl command. See Section 4.4.3, “Management and Monitoring of Kafka Deployments” for more information.
A number of optional, configurable, properties are available that
control how Tungsten Replicator applies and populates information when the
data is written into Kafka. The following properties can by set during
configuration using
--property=PROPERTYNAME=value
:
Table 4.1. Optional Kafka Applier Properties
Option | Description |
---|---|
replicator.applier.dbms.embedCommitTime | Sets whether the commit time for the source row is embedded into the document |
replicator.applier.dbms.embedSchemaTable | Embed the source schema name and table name in the stored document |
replicator.applier.dbms.enabletxinfo.kafka | Embeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message |
replicator.applier.dbms.enabletxninfoTopic | Embeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event. |
replicator.applier.dbms.keyFormat | Determines the format of the message ID |
replicator.applier.dbms.requireacks | Defines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required |
replicator.applier.dbms.retrycount | The number of retries for sending each message |
replicator.applier.dbms.txninfoTopic | Sets the topic name for transaction messages |
replicator.applier.dbms.zookeeperString | Connection string for Zookeeper, including hostname and port |
− replicator.applier.dbms.embedCommitTime
Option | replicator.applier.dbms.embedCommitTime | |
Description | Sets whether the commit time for the source row is embedded into the document | |
Value Type | boolean | |
Default | true | |
Valid Values | false | Do not embed the source database commit time |
true | Embed the source database commit time into the stored document |
Embeds the commit time of the source database row into the document information:
{
"_seqno" : "4865",
"_source_table" : "msg",
"_committime" : "2017-07-13 15:30:37.0",
"_source_schema" : "test",
"record" : {
"msg" : "Hello Kafka",
"id" : "2384726"
},
"_optype" : "INSERT"
}
− replicator.applier.dbms.embedSchemaTable
Option | replicator.applier.dbms.embedSchemaTable | |
Description | Embed the source schema name and table name in the stored document | |
Value Type | boolean | |
Default | true | |
Valid Values | false | Do not embed the schema or database name in the document |
true | Embed the source schema name and database name into the stored document |
If enabled, the documented stored into Elasticsearch will include the source schema and database name. This can be used to identify the source of the information if the schema and table name is not being used for the index and type names (see replicator.applier.dbms.useSchemaAsIndex and replicator.applier.dbms.useTableAsType).
{ "_seqno" : "4865", "_source_table" : "msg", "_committime" : "2017-07-13 15:30:37.0", "_source_schema" : "test", "record" : { "msg" : "Hello Kafka", "id" : "2384726" }, "_optype" : "INSERT" }
− replicator.applier.dbms.enabletxinfo.kafka
Option | replicator.applier.dbms.enabletxinfo.kafka | |
Description | Embeds transaction information (generated by the rowaddtxninfo filter) into each Kafka message | |
Value Type | boolean | |
Default | false | |
Valid Values | false | Do not include transaction information in each |
true | Embed transaction information into each Kafka message |
Embeds information about the entire transaction information
using the data provided by the
rowaddtxninfo
filter and
other information embedded in each THL event into each message
sent. The transaction information includes information about the
entire transaction (row counts, event ID and tables modified)
into each message. Since one message is normally sent for each
row of data, by adding the information about the full
transaction into the message it's possible to validate and
identify what other messages may be part of a single transaction
when the messages are being re-assembled by a Kafka client.
For example, when looking at a single message in Kafka, the
message includes a txninfo
section:
{ "_source_table" : "msg", "_committime" : "2018-03-07 12:53:21.0", "record" : { "msg2" : "txinfo", "id" : "109", "msg" : "txinfo" }, "_optype" : "INSERT", "_seqno" : "164", "txnInfo" : { "schema" : [ { "schemaName" : "msg", "rowCount" : "1", "tableName" : "msg" }, { "rowCount" : "2", "schemaName" : "msg", "tableName" : "msgsub" } ], "serviceName" : "alpha", "totalCount" : "3", "tungstenTransId" : "164", "firstRecordInTransaction" : "true" }, "_source_schema" : "msg" }
This block of the overall message includes the following objects and information:
schema
An array of the row counts within this transaction, with a row count included for each schema and table.
serviceName
The name of the Tungsten Replicator service that generated the message.
totalCount
The total number of rows modified within the entire transaction.
firstRecordInTransaction
If this field exists, it should always be set to true and indicats that this message was generated by the first row inserted, updated or deleted in the overall transaction. This effectively indicates the start of the overall transaction.
lastRecordInTransaction
If this field exists, it should always be set to true and indicats that this message was generated by the last row inserted, updated or deleted in the overall transaction. This effectively indicates the end of the overall transaction
Note that this information block is included in
every message for each row within an
overall transaction. The
firstRecordInTransaction
and
lastRecordInTransaction
can be used to identify the start and end of the transaction
overall.
− replicator.applier.dbms.enabletxninfoTopic
Option | replicator.applier.dbms.enabletxninfoTopic | |
Description | Embeds transaction information into a separate Kafka message broadcast on an independent channel from the one used by the actual database data. One message is sent per transaction or THL event. | |
Value Type | boolean | |
Default | false | |
Valid Values | false | Do not generate transaction information |
true | Send transaction information on a separate Kafka topic for each transaction |
If enabled, it sends a separate message on a Kafka topic containing information about the entire tranaction. The topic name can be configured by setting the replicator.applier.dbms.txninfoTopic property.
The default message sent will look like the following example:
{ "txnInfo" : { "tungstenTransId" : "164", "schema" : [ { "schemaName" : "msg", "rowCount" : "1", "tableName" : "msg" }, { "schemaName" : "msg", "rowCount" : "2", "tableName" : "msgsub" } ], "totalCount" : "3", "serviceName" : "alpha" } }
This block of the overall message includes the following objects and information:
schema
An array of the row counts within this transaction, with a row count included for each schema and table.
serviceName
The name of the Tungsten Replicator service that generated the message.
totalCount
The total number of rows modified within the entire transaction.
− replicator.applier.dbms.keyFormat
Option | replicator.applier.dbms.keyFormat | |
Description | Determines the format of the message ID | |
Value Type | string | |
Default | pkey | |
Valid Values | pkey | Combine the primary key column values into a single string |
pkeyus | Combine the primary key column values into a single string joined by an underscore character | |
tspkey | Combine the schema name, table name, and primary key column values into a single string joined by an underscore character | |
tspkeyus | Combine the schema name, table name, and primary key column values into a single string |
Determines the format of the message ID used when sending the
message into Kafka. For example, when configured to use
tspkeyus
, then the format
of the message ID will consist of the schemaname, table name and
primary key column information separated by underscores,
SCHEMANAME_TABLENAME_234
.
− replicator.applier.dbms.requireacks
Option | replicator.applier.dbms.requireacks | |
Description | Defines whether when writing messages to the Kafka cluster, how many acknowledgements from Kafka nodes is required | |
Value Type | string | |
Default | all | |
Valid Values | 1 | Only the lead host should acknowledge receipt of the message |
all | All nodes should acknowledge receipt of the message |
Sets the acknowledgement counter for sending messages into the Kafka queue.
− replicator.applier.dbms.retrycount
Option | replicator.applier.dbms.retrycount | |
Description | The number of retries for sending each message | |
Value Type | number | |
Default | 0 |
Determines the number of times the message will attempt to be sent before failure.
− replicator.applier.dbms.txninfoTopic
Option | replicator.applier.dbms.txninfoTopic | |
Description | Sets the topic name for transaction messages | |
Value Type | string | |
Default | tungsten_transactions |
Sets the topic name to be used when sending independent transaction information messagesa about each THL event. See replicator.applier.dbms.addtxninfo.
− replicator.applier.dbms.zookeeperString
Option | replicator.applier.dbms.zookeeperString | |
Description | Connection string for Zookeeper, including hostname and port | |
Value Type | string | |
Default | ${replicator.global.db.host}:2181 |
The string to be used when connecting to Zookeeper. The default is to use port 2181 on the host used by replicator.global.db.host.