11.4.29. NetworkClient Filter

The NetworkClientFilter processes data in selected columns

Pre-configured filter name networkclient
Classname com.continuent.tungsten.replicator.filter.NetworkClientFilter
Property prefix replicator.filter.networkclient
Stage compatibility Any
tpm Option compatibility --svc-extractor-filters, --svc-thl-filters, --svc-applier-filters
Data compatibility Row events
Parameters
Parameter Type Default Description
definitionsFile pathname ${replicator.home.dir}/samples/extensions/java/networkclient.json The name of a file containing the definitions for how columns should be processed by filters
serverPort number 3112 The network port to use when communicating with the network client
timeout number 10 Timeout in seconds before treating the network client as failed when waiting to send or receive content.

The network filter operates by sending field data, as defined in the corresponding filter configuration file, out to a network server that processes the information and sends it back to be re-introduced in place of the original field data. This can be used to translate and reformat information during the replication scheme.

The filter operation works as follows:

  • All filtered data will be sent to a single network server, at the configured port.

  • A single network server can be used to provide multiple transformations.

  • The JSON configuration file for the filter supports multiple types and multiple column definitions.

  • The protocol used by the network filter must be followed to effectively process the information. A failure in the network server or communication will cause the replicator to raise an error and replication to go OFFLINE.

  • The network server must be running before the replicator is started. If the network server cannot be found, replication will go OFFLINE.

Correct operation requires building a suitable network filter using the defined protocol, and creating the JSON configuration file. A sample filter is provided for reference.

11.4.29.1. Network Client Configuration

The format of the configuration file defines the translation operation to be requested from the network client, in addition to the schema, table and column name. The format for the file is JSON, with the top-level hash defining the operation, and an array of field selections for each field that should be processed accordingly. For example:

{
   "String_to_HEX_v1" : [
      {
         "table" : "hextable",
         "schema" : "hexdb",
         "columns" : [
            "hexcol"
         ]
      }
   ]
}

The operation in this case is String_to_HEX_v1; this will be sent to the network server as part of the request. The column definition follows.

To send multiple columns from different tables to the same translation:

{
   "String_to_HEX_v1" : [
      {
         "table" : "hextable",
         "schema" : "hexdb",
         "columns" : [
            "hexcol"
         ]
      },
      {
         "table" : "hexagon",
         "schema" : "sourcetext",
         "columns" : [
            "itemtext"
         ]
      }
   ]
}

Alternatively, to configure different operations for the same two tables:

{
   "String_to_HEX_v1" : [
      {
         "table" : "hextable",
         "schema" : "hexdb",
         "columns" : [
            "hexcol"
         ]
      }
   ],
   "HEX_to_String_v1" : [
      {
         "table" : "hexagon",
         "schema" : "sourcetext",
         "columns" : [
            "itemtext"
         ]
      }
   ]
}

11.4.29.2. Network Filter Protocol

The network filter protocol has been designed to be both lightweight and binary data compatible, as it is designed to work with data that may be heavily encoded, binary, or compressed in nature.

The protocol operates through a combined JSON and optional binary payload structure that communicates the information. The JSON defines the communication type and metadata, while the binary payload contains the raw or translated information.

The filter communicates with the network server using the following packet types:

  • prepare

    The prepare message is called when the filter goes online, and is designed to initialize the connection to the network server and confirm the supported filter types and operation. The format of the connection message is:

    {
       "payload" : -1,
       "type" : "prepare",
       "service" : "firstrep",
       "protocol" : "v0_9"
    }

    Where:

    • protocol

      The protocol version.

    • service

      The name of the replicator service that called the filter.

    • type

      The message type.

    • payload

      The size of the payload; a value of -1 indicates that there is no payload.

    The format of the response should be a JSON object and payload with the list of supported filter types in the payload section. The payload immediately follows the JSON, with the size of the list defined within the payload field of the returned JSON object:

    {
       "payload" : 22,
       "type" : "acknowledged",
       "protocol" : "v0_9",
       "service" : "firstrep",
       "return" : 0
    }Perl_BLOB_to_String_v1

    Where:

    • protocol

      The protocol version.

    • service

      The name of the replicator service that called the filter.

    • type

      The message type; when acknowledging the original prepare request it should be acknowledge.

    • return

      The return value. A value of 0 (zero) indicates no faults. Any true value indicates there was an issue.

    • payload

      The length of the appended payload information in bytes. This is used by the filter to identify how much additional data to read after the JSON object has been read.

    The payload should be a comma-separated list of the supported transformation types within the network server.

  • filter

    The filter message type is sent by Tungsten Replicator for each value from the replication stream that needs to be filtered and translated in some way. The format of the request is a JSON object with a trailing block of data, the payload, that contains the information to be filtered. For example:

    {
       "schema" : "hexdb",
       "transformation" : "String_to_HEX_v1",
       "service" : "firstrep",
       "type" : "filter",
       "payload" : 22,
       "row" : 0,
       "column" : "hexcol",
       "table" : "hextable",
       "seqno" : 145196,
       "fragments" : 1,
       "protocol" : "v0_9",
       "fragment" : 1
    }48656c6c6f20576f726c64

    Where:

    • protocol

      The protocol version.

    • service

      The service name the requested the filter.

    • type

      The message type, in this case, filter.

    • row

      The row of the source information from the THL that is being filtered.

    • schema

      The schema of the source information from the THL that is being filtered.

    • table

      The table of the source information from the THL that is being filtered.

    • column

      The column of the source information from the THL that is being filtered.

    • seqno

      The sequence number of the event from the THL that is being filtered.

    • fragments

      The number of fragments in the THL that is being filtered.

    • fragment

      The fragment number within the THL that is being filtered. The fragments may be sent individually and sequentially to the network server, so they may need to be retrieved, merged, and reconstituted depending on the nature of the source data and the filter being applied.

    • transformation

      The transformation to be performed on the supplied payload data. A single network server can support multiple transformations, so this information is provided to perform the corrupt operation. The actual transformation to be performed is taken from the JSON configuration file for the filter.

    • payload

      The length, in bytes, of the payload data that will immediately follow the JSON filter request..

    The payload that immediately follows the JSON block is the data from the column that should be processed by the network filter.

    The response package should contain a copy of the supplied information from the requested filter, with the payload size updated to the size of the returned information, the message type changed to filtered, and the payload containing the translated data. For example:

    {
       "transformation" : "String_to_HEX_v1",
       "fragments" : 1,
       "type" : "filtered",
       "fragment" : 1,
       "return" : 0,
       "seqno" : 145198,
       "table" : "hextable",
       "service" : "firstrep",
       "protocol" : "v0_9",
       "schema" : "hexdb",
       "payload" : 8,
       "column" : "hexcol",
       "row" : 0
    }FILTERED

11.4.29.3. Sample Network Client

The following sample network server script is written in Perl, and is designed to translated packed hex strings (two-hex characters per byte) from their hex representation into their character representation.

#!/usr/bin/perl

use Switch;
use IO::Socket::INET;
use JSON qw( decode_json encode_json);
use Data::Dumper;

# auto-flush on socket
$| = 1;

my $serverName = "Perl_BLOB_to_String_v1";

while(1)
{
# creating a listening socket
my $socket = new IO::Socket::INET (
    LocalHost => '0.0.0.0',
    LocalPort => '3112',
    Proto => 'tcp',
    Listen => 5,
    Reuse => 1
);
die "Cannot create socket $!\n" unless $socket;
print "********\nServer waiting for client connection on port 3112\n******\n\n\n";

# Waiting for a new client connection
my $client_socket = $socket->accept();

# Fet information about a newly connected client
my $client_address = $client_socket->peerhost();
my $client_port = $client_socket->peerport();
print "Connection from $client_address:$client_port\n";

my $data = "";

while(    $data = $client_socket->getline())
{
    # Eead up to 1024 characters from the connected client
    chomp($data);

    print "\n\nReceived: <$data>\n";

    # Decode the JSON part
    my $msg = decode_json($data);

    # Extract payload
    my $payload = undef;

    if ($msg->{payload} > 0)
        {
            print STDERR "Reading $msg->{payload} bytes\n";
            $client_socket->read($payload,$msg->{payload});
            print "Payload: <$payload>\n";
          }

    switch( $msg->{'type'} )
    {
        case "prepare"
        {
              print STDERR "Received prepare request\n";

            # Send acknowledged message
            my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' .
                      '"return": 0, "service": "' . $msg->{'service'} . '", "payload": ' .
                      length($serverName) . '}' . "\n" . $serverName;

            print $client_socket "$out";
            print "Sent: <$out>\n";
                print STDERR "Sent acknowledge request\n";
        }
        case "release"
        {
            # Send acknowledged message
            my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' . 
                      '"return": 0, "service": "' . $msg->{'service'} . '", "payload": 0}';

            print $client_socket "$out\n";
            print "Sent: <$out>\n";
        }
        case "filter"
        {
            # Send filtered message

                  print STDERR "Sending filtered payload\n";

            my $filtered = "FILTERED";
            my $out = <<END;
{
"protocol": "v0_9",
"type": "filtered",
"transformation": "$msg->{'transformation'}",
"return": 0,
"service": "$msg->{'service'}",
"seqno": $msg->{'seqno'},
"row": $msg->{'row'},
"schema": "$msg->{'schema'}",
"table": "$msg->{'table'}",
"column": "$msg->{'column'}",
"fragment": 1,
"fragments": 1,
"payload": @{[length($filtered)]}
}
END

$out =~ s/\n//g;
            print "About to send: <$out>\n";
            $client_socket->send("$out\n" . $filtered);
print("Response sent\n");
        }
    }

print("End of loop, hoping for next packet\n");
}

    # Notify client that we're done writing
    shutdown($client_socket, 1);

$socket->close();
}