The NetworkClientFilter
processes data in selected
columns
Pre-configured filter name |
networkclient
| ||
Classname |
com.continuent.tungsten.replicator.filter.NetworkClientFilter
| ||
Property prefix |
replicator.filter.networkclient
| ||
Stage compatibility | Any | ||
tpm Option compatibility |
--svc-extractor-filters ,
--svc-thl-filters ,
--svc-applier-filters
| ||
Data compatibility | Row events | ||
Parameters | |||
Parameter | Type | Default | Description |
definitionsFile
| pathname | ${replicator.home.dir}/samples/extensions/java/networkclient.json | The name of a file containing the definitions for how columns should be processed by filters |
serverPort
| number | 3112 | The network port to use when communicating with the network client |
timeout
| number | 10 | Timeout in seconds before treating the network client as failed when waiting to send or receive content. |
The network filter operates by sending field data, as defined in the corresponding filter configuration file, out to a network server that processes the information and sends it back to be re-introduced in place of the original field data. This can be used to translate and reformat information during the replication scheme.
The filter operation works as follows:
All filtered data will be sent to a single network server, at the configured port.
A single network server can be used to provide multiple transformations.
The JSON configuration file for the filter supports multiple types and multiple column definitions.
The protocol used by the network filter must be followed to
effectively process the information. A failure in the network server
or communication will cause the replicator to raise an error and
replication to go OFFLINE
.
The network server must be running before the replicator is started.
If the network server cannot be found, replication will go
OFFLINE
.
Correct operation requires building a suitable network filter using the defined protocol, and creating the JSON configuration file. A sample filter is provided for reference.
The format of the configuration file defines the translation operation to be requested from the network client, in addition to the schema, table and column name. The format for the file is JSON, with the top-level hash defining the operation, and an array of field selections for each field that should be processed accordingly. For example:
{ "String_to_HEX_v1" : [ { "table" : "hextable", "schema" : "hexdb", "columns" : [ "hexcol" ] } ] }
The operation in this case is
String_to_HEX_v1
; this will be
sent to the network server as part of the request. The column definition
follows.
To send multiple columns from different tables to the same translation:
{ "String_to_HEX_v1" : [ { "table" : "hextable", "schema" : "hexdb", "columns" : [ "hexcol" ] }, { "table" : "hexagon", "schema" : "sourcetext", "columns" : [ "itemtext" ] } ] }
Alternatively, to configure different operations for the same two tables:
{ "String_to_HEX_v1" : [ { "table" : "hextable", "schema" : "hexdb", "columns" : [ "hexcol" ] } ], "HEX_to_String_v1" : [ { "table" : "hexagon", "schema" : "sourcetext", "columns" : [ "itemtext" ] } ] }
The network filter protocol has been designed to be both lightweight and binary data compatible, as it is designed to work with data that may be heavily encoded, binary, or compressed in nature.
The protocol operates through a combined JSON and optional binary payload structure that communicates the information. The JSON defines the communication type and metadata, while the binary payload contains the raw or translated information.
The filter communicates with the network server using the following packet types:
prepare
The prepare
message is called
when the filter goes online, and is designed to initialize the
connection to the network server and confirm the supported filter
types and operation. The format of the connection message is:
{ "payload" : -1, "type" : "prepare", "service" : "firstrep", "protocol" : "v0_9" }
Where:
protocol
The protocol version.
service
The name of the replicator service that called the filter.
type
The message type.
payload
The size of the payload; a value of -1 indicates that there is no payload.
The format of the response should be a JSON object and payload with
the list of supported filter types in the payload section. The
payload immediately follows the JSON, with the size of the list
defined within the payload
field of the returned JSON object:
{ "payload" : 22, "type" : "acknowledged", "protocol" : "v0_9", "service" : "firstrep", "return" : 0 }Perl_BLOB_to_String_v1
Where:
protocol
The protocol version.
service
The name of the replicator service that called the filter.
type
The message type; when acknowledging the original prepare
request it should be
acknowledge
.
return
The return value. A value of 0 (zero) indicates no faults. Any true value indicates there was an issue.
payload
The length of the appended payload information in bytes. This is used by the filter to identify how much additional data to read after the JSON object has been read.
The payload should be a comma-separated list of the supported transformation types within the network server.
filter
The filter
message type is
sent by Tungsten Replicator for each value from the replication stream
that needs to be filtered and translated in some way. The format of
the request is a JSON object with a trailing block of data, the
payload, that contains the information to be filtered. For example:
{ "schema" : "hexdb", "transformation" : "String_to_HEX_v1", "service" : "firstrep", "type" : "filter", "payload" : 22, "row" : 0, "column" : "hexcol", "table" : "hextable", "seqno" : 145196, "fragments" : 1, "protocol" : "v0_9", "fragment" : 1 }48656c6c6f20576f726c64
Where:
protocol
The protocol version.
service
The service name the requested the filter.
type
The message type, in this case,
filter
.
row
The row of the source information from the THL that is being filtered.
schema
The schema of the source information from the THL that is being filtered.
table
The table of the source information from the THL that is being filtered.
column
The column of the source information from the THL that is being filtered.
seqno
The sequence number of the event from the THL that is being filtered.
fragments
The number of fragments in the THL that is being filtered.
fragment
The fragment number within the THL that is being filtered. The fragments may be sent individually and sequentially to the network server, so they may need to be retrieved, merged, and reconstituted depending on the nature of the source data and the filter being applied.
transformation
The transformation to be performed on the supplied payload data. A single network server can support multiple transformations, so this information is provided to perform the corrupt operation. The actual transformation to be performed is taken from the JSON configuration file for the filter.
payload
The length, in bytes, of the payload data that will immediately follow the JSON filter request..
The payload that immediately follows the JSON block is the data from the column that should be processed by the network filter.
The response package should contain a copy of the supplied
information from the requested filter, with the
payload
size updated to the
size of the returned information, the message type changed to
filtered
, and the payload
containing the translated data. For example:
{ "transformation" : "String_to_HEX_v1", "fragments" : 1, "type" : "filtered", "fragment" : 1, "return" : 0, "seqno" : 145198, "table" : "hextable", "service" : "firstrep", "protocol" : "v0_9", "schema" : "hexdb", "payload" : 8, "column" : "hexcol", "row" : 0 }FILTERED
The following sample network server script is written in Perl, and is designed to translated packed hex strings (two-hex characters per byte) from their hex representation into their character representation.
#!/usr/bin/perl use Switch; use IO::Socket::INET; use JSON qw( decode_json encode_json); use Data::Dumper; # auto-flush on socket $| = 1; my $serverName = "Perl_BLOB_to_String_v1"; while(1) { # creating a listening socket my $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '3112', Proto => 'tcp', Listen => 5, Reuse => 1 ); die "Cannot create socket $!\n" unless $socket; print "********\nServer waiting for client connection on port 3112\n******\n\n\n"; # Waiting for a new client connection my $client_socket = $socket->accept(); # Fet information about a newly connected client my $client_address = $client_socket->peerhost(); my $client_port = $client_socket->peerport(); print "Connection from $client_address:$client_port\n"; my $data = ""; while( $data = $client_socket->getline()) { # Eead up to 1024 characters from the connected client chomp($data); print "\n\nReceived: <$data>\n"; # Decode the JSON part my $msg = decode_json($data); # Extract payload my $payload = undef; if ($msg->{payload} > 0) { print STDERR "Reading $msg->{payload} bytes\n"; $client_socket->read($payload,$msg->{payload}); print "Payload: <$payload>\n"; } switch( $msg->{'type'} ) { case "prepare" { print STDERR "Received prepare request\n"; # Send acknowledged message my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' . '"return": 0, "service": "' . $msg->{'service'} . '", "payload": ' . length($serverName) . '}' . "\n" . $serverName; print $client_socket "$out"; print "Sent: <$out>\n"; print STDERR "Sent acknowledge request\n"; } case "release" { # Send acknowledged message my $out = '{ "protocol": "v0_9", "type": "acknowledged", ' . '"return": 0, "service": "' . $msg->{'service'} . '", "payload": 0}'; print $client_socket "$out\n"; print "Sent: <$out>\n"; } case "filter" { # Send filtered message print STDERR "Sending filtered payload\n"; my $filtered = "FILTERED"; my $out = <<END; { "protocol": "v0_9", "type": "filtered", "transformation": "$msg->{'transformation'}", "return": 0, "service": "$msg->{'service'}", "seqno": $msg->{'seqno'}, "row": $msg->{'row'}, "schema": "$msg->{'schema'}", "table": "$msg->{'table'}", "column": "$msg->{'column'}", "fragment": 1, "fragments": 1, "payload": @{[length($filtered)]} } END $out =~ s/\n//g; print "About to send: <$out>\n"; $client_socket->send("$out\n" . $filtered); print("Response sent\n"); } } print("End of loop, hoping for next packet\n"); } # Notify client that we're done writing shutdown($client_socket, 1); $socket->close(); }