6.10.1. Preparing Hosts for Kafka Replication

In general, it is easier to understand that a row within the MySQL table is converted into a single message on the Kafka side, the topic used is made up of the schema name and table name, and the message ID is composed of the primary key information, but can optionally include the schema and table name and primary key information.

For example, the following row within MySQL:

mysql> select * from messages where id = 99999 \G
*************************** 1. row ***************************
 id: 99999
 msg: Hello Kafka
 1 row in set (0.00 sec)

Is replicated into Kafka as a Kafka message using the topic test_msg:

{
   "_seqno" : "4865",
   "_source_table" : "msg",
   "_committime" : "2017-07-13 15:30:37.0",
   "_source_schema" : "test",
   "record" : {
      "msg" : "Hello Kafka",
      "id" : "2384726"
   },
   "_optype" : "INSERT"
}

In the output, the record contains the actualy record data, the other fields in the message are:

  • _seqno — the THL sequence number of the transaction.

  • _source_table — the source table. Inclusion of this information is optional.

  • _committime — the original transaction commit time. Inclusion of this information is optional.

  • _source_schema — the source schema. Inclusion of this information is optional.

  • _optype — the operation type (INSERT, UPDATE, DELETE).

When preparing the hosts you must be aware of this translation of the different structures, as it will have an effect on the way the information is replicated from MySQL to Kafka.

MySQL Host

The data replicated from MySQL can be any data, although there are some known limitations and assumptions made on the way the information is transferred.

The following are required for replication to Kafka:

  • MySQL must be using Row-based replication for information to be replicated to Kafka. For the best results, you should change the global binary log format, ideally in the configuration file (my.cnf):

    binlog-format = row

    Alternatively, the global binlog format can be changed by executing the following statement:

    mysql> SET GLOBAL binlog_format = ROW;

    For MySQL 5.6.2 and later, you must enable full row log images:

    binlog-row-image = full

    Alternatively, the global binlog row image setting can be changed by executing the following statement:

    mysql> SET GLOBAL binlog_row_image = full;

    This information will be forgotten when the MySQL server is restarted; placing the configuration settings into the my.cnf file will ensure they will survive a restart of the MySQL server or OS.

  • Table format should be updated to UTF8 by updating the MySQL configuration (my.cnf):

    character-set-server=utf8
    collation-server=utf8_general_ci

    Tables must also be configured as UTF8 tables, and existing tables should be updated to UTF8 support before they are replicated to prevent character set corruption issues.

  • To prevent timezone configuration storing zone adjusted values and exporting this information to the binary log and Kafka, fix the timezone configuration to use UTC within the configuration file (my.cnf):

    default-time-zone='+00:00'

For the best results when replicating, be aware of the following issues and limitations:

  • Use primary keys on all tables. The use of primary keys will improve the lookup of information within Kafka when rows are updated. Without a primary key on a table a full table scan is performed, which can affect performance.

  • MySQL TEXT columns are correctly replicated, but cannot be used as keys.

  • MySQL BLOB columns are converted to text using the configured character type. Depending on the data that is being stored within the BLOB, the data may need to be custom converted. A filter can be written to convert and reformat the content as required.

Kafka Host

On the Kafka side, status information is stored into the Zookeeper instance used for configuring Kafka, and the Zookeeper and Kafka instances must be up and running before the replicator is first started. There are no specific configuration elements required on the Kafka host.