NetworkClient Filter
The NetworkClientFilter processes data in selected columns
| Pre-configured filter name | networkclient |
| Classname | com.continuent.tungsten.replicator.filter.NetworkClientFilter |
| Property prefix | replicator.filter.networkclient |
| Stage compatibility | Any |
tpm Option compatibility | svc-extractor-filters, svc-thl-filters, svc-applier-filters |
| Data compatibility | Row events |
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
definitionsFile | pathname | ${replicator.home.dir}/samples/extensions/java/networkclient.json | The name of a file containing the definitions for how columns should be processed by filters |
serverPort | number | 3112 | The network port to use when communicating with the network client |
timeout | number | 10 | Timeout in seconds before treating the network client as failed when waiting to send or receive content. |
The network filter operates by sending field data, as defined in the corresponding filter configuration file, out to a network server that processes the information and sends it back to be re-introduced in place of the original field data. This can be used to translate and reformat information during the replication scheme.
The filter operation works as follows:
- All filtered data will be sent to a single network server, at the configured port.
- A single network server can be used to provide multiple transformations.
- The JSON configuration file for the filter supports multiple types and multiple column definitions.
- The protocol used by the network filter must be followed to effectively process the information. A failure in the network server or communication
will cause the replicator to raise an error and replication to go
OFFLINE. - The network server must be running before the replicator is started. If the network server cannot be found, replication will go
OFFLINE.
Correct operation requires building a suitable network filter using the defined protocol and creating the JSON configuration file. A sample filter is provided for reference.
Network Client Configuration
The format of the configuration file defines the translation operation to be requested from the network client, in addition to the schema, table and column name. The format for the file is JSON, with the top-level hash defining the operation, and an array of field selections for each field that should be processed accordingly. For example:
{
"String_to_HEX_v1" : [
{
"table" : "hextable",
"schema" : "hexdb",
"columns" : [
"hexcol"
]
}
]
}
The operation in this case is String_to_HEX_v1; this will be sent to the network server as part of the request. The column definition follows.
To send multiple columns from different tables to the same translation:
{
"String_to_HEX_v1" : [
{
"table" : "hextable",
"schema" : "hexdb",
"columns" : [
"hexcol"
]
},
{
"table" : "hexagon",
"schema" : "sourcetext",
"columns" : [
"itemtext"
]
}
]
}
Alternatively, to configure different operations for the same two tables:
{
"String_to_HEX_v1" : [
{
"table" : "hextable",
"schema" : "hexdb",
"columns" : [
"hexcol"
]
}
],
"HEX_to_String_v1" : [
{
"table" : "hexagon",
"schema" : "sourcetext",
"columns" : [
"itemtext"
]
}
]
}
Network Filter Protocol
The network filter protocol has been designed to be both lightweight and binary data compatible, as it is designed to work with data that may be heavily encoded, binary, or compressed in nature.
The protocol operates through a combined JSON and optional binary payload structure that communicates the information. The JSON defines the communication type and metadata, while the binary payload contains the raw or translated information.
The filter communicates with the network server using the following packet types:
- prepare
The
preparemessage is called when the filter goes online, and is designed to initialize the connection to the network server and confirm the supported filter types and operation. The format of the connection message is:{"payload" : -1,"type" : "prepare","service" : "firstrep","protocol" : "v0_9"}Where:
protocol: The protocol version.service: The name of the replicator service that called the filter.type: The message type.payload: The size of the payload; a value of -1 indicates that there is no payload.
The format of the response should be a JSON object and payload with the list of supported filter types in the payload section. The payload immediately follows the JSON, with the size of the list defined within the
payloadfield of the returned JSON object:{"payload" : 22,"type" : "acknowledged","protocol" : "v0_9","service" : "firstrep","return" : 0}Perl_BLOB_to_String_v1Where:
protocol: The protocol version.service: The name of the replicator service that called the filter.type: The message type; when acknowledging the original prepare request it should beacknowledge.return: The return value. A value of 0 (zero) indicates no faults. Any true value indicates there was an issue.payload: The length of the appended payload information in bytes. This is used by the filter to identify how much additional data to read after the JSON object has been read.
The payload should be a comma-separated list of the supported transformation types within the network server.
- filter
The
filtermessage type is sent by Tungsten Replicator for each value from the replication stream that needs to be filtered and translated in some way. The format of the request is a JSON object with a trailing block of data, the payload, that contains the information to be filtered. For example:{"schema" : "hexdb","transformation" : "String_to_HEX_v1","service" : "firstrep","type" : "filter","payload" : 22,"row" : 0,"column" : "hexcol","table" : "hextable","seqno" : 145196,"fragments" : 1,"protocol" : "v0_9","fragment" : 1}48656c6c6f20576f726c64Where:
protocol: The protocol version.service: The service name the requested the filter.type: The message type, in this case,filter.row: The row of the source information from the THL that is being filtered.schema: The schema of the source information from the THL that is being filtered.table: The table of the source information from the THL that is being filtered.column: The column of the source information from the THL that is being filtered.seqno: The sequence number of the event from the THL that is being filtered.fragments: The number of fragments in the THL that is being filtered.fragment: The fragment number within the THL that is being filtered. The fragments may be sent individually and sequentially to the network server, so they may need to be retrieved, merged, and reconstituted depending on the nature of the source data and the filter being applied.transformation: The transformation to be performed on the supplied payload data. A single network server can support multiple transformations, so this information is provided to perform the correct operation. The actual transformation to be performed is taken from the JSON configuration file for the filter.payload: The length, in bytes, of the payload data that will immediately follow the JSON filter request.
The payload that immediately follows the JSON block is the data from the column that should be processed by the network filter.
The response package should contain a copy of the supplied information from the requested filter, with the
payloadsize updated to the size of the returned information, the message type changed tofiltered, and the payload containing the translated data. For example:{"transformation" : "String_to_HEX_v1","fragments" : 1,"type" : "filtered","fragment" : 1,"return" : 0,"seqno" : 145198,"table" : "hextable","service" : "firstrep","protocol" : "v0_9","schema" : "hexdb","payload" : 8,"column" : "hexcol","row" : 0}FILTERED
Sample Network Client
The following sample network server script is written in Perl, and is designed to translated packed hex strings (two-hex characters per byte) from their hex representation into their character representation.
#!/usr/bin/perl
use Switch;
use IO::Socket::INET;
use JSON qw( decode_json encode_json);
use Data::Dumper;
# auto-flush on socket
$| = 1;
my $serverName = "Perl_BLOB_to_String_v1";
while(1)
{
# creating a listening socket
my $socket = new IO::Socket::INET (
LocalHost => '0.0.0.0',
LocalPort => '3112',
Proto => 'tcp',
Listen => 5,
Reuse => 1
);
die "Cannot create socket $!\n" unless $socket;
print "********\nServer waiting for client connection on port 3112\n******\n\n\n";
# Waiting for a new client connection
my $client_socket = $socket->accept();
# Fet information about a newly connected client
my $client_address = $client_socket->peerhost();
my $client_port = $client_socket->peerport();
print "Connection from $client_address:$client_port\n";
my $data = "";
while( $data = $client_socket->getline())
{
# Eead up to 1024 characters from the connected client
chomp($data);
print "\n\nReceived: <$data>\n";
# Decode the JSON part
my $msg = decode_json($data);
# Extract payload
my $payload = undef;
if ($msg->{payload} > 0)
{
print STDERR "Reading $msg->{payload} bytes\n";
$client_socket->read($payload,$msg->{payload});
print "Payload: <$payload>\n";
}
switch( $msg->{'type'} )
{
case "prepare"
{
print STDERR "Received prepare request\n";
# Send acknowledged message
my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' .
'"return": 0, "service": "' . $msg->{'service'} . '", "payload": ' .
length($serverName) . '}' . "\n" . $serverName;
print $client_socket "$out";
print "Sent: <$out>\n";
print STDERR "Sent acknowledge request\n";
}
case "release"
{
# Send acknowledged message
my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' .
'"return": 0, "service": "' . $msg->{'service'} . '", "payload": 0}';
print $client_socket "$out\n";
print "Sent: <$out>\n";
}
case "filter"
{
# Send filtered message
print STDERR "Sending filtered payload\n";
my $filtered = "FILTERED";
my $out = <<END;
{
"protocol": "v0_9",
"type": "filtered",
"transformation": "$msg->{'transformation'}",
"return": 0,
"service": "$msg->{'service'}",
"seqno": $msg->{'seqno'},
"row": $msg->{'row'},
"schema": "$msg->{'schema'}",
"table": "$msg->{'table'}",
"column": "$msg->{'column'}",
"fragment": 1,
"fragments": 1,
"payload": @{[length($filtered)]}
}
END
$out =~ s/\n//g;
print "About to send: <$out>\n";
$client_socket->send("$out\n" . $filtered);
print("Response sent\n");
}
}
print("End of loop, hoping for next packet\n");
}
# Notify client that we're done writing
shutdown($client_socket, 1);
$socket->close();
}