Search This Blog

Thursday, November 26, 2009

Get NetFlow entries from a flowd via logsock to put to PostgreSQL in Perl

Here is a small piece of code in Perl to get NetFlow entries from a flowd via logging socket (logsock) and put to a PostgreSQL database.
Beware, no sanity checking at all!!!

=== cut here ===

#!/usr/bin/perl
use IO::Socket;
use Socket;
use Flowd;
use DBI;

# Database settings
my $DBI_DRIVER = "Pg"; # or one of "Pg" "mysql" "mysqlPP"
my $DB = "netflow";
my $HOST = "localhost";
my $TABLE = "flow";
my $USER = "netflow";
my $PASS = "password";

$sock_addr="/var/run/flowd/flowd.sock";
unlink($sock_addr);

$sock = IO::Socket::UNIX->new( Local => $sock_addr, Type => SOCK_DGRAM)
or die "Can't bind to Unix Socket: $!\n";
$sock->setsockopt(SOL_SOCKET, SO_RCVBUF, 65440);

my $db = DBI->connect("dbi:$DBI_DRIVER:host=$HOST;database=$DB", $USER, $PASS)
or die "DBI->connect error: " . $DBI::errstr;

print "Started.\n";
while ($bytes = $sock->recv($input,1024)) {
$flowfields = Flowd::deserialise($input);

$recv_time = sprintf "%s.%03d",$flowfields->{recv_sec}, $flowfields->{recv_usec};
$flow_start = $recv_time + ($flowfields->{flow_start} - $flowfields->{sys_uptime_ms})/1000;
$flow_finish = $recv_time + ($flowfields->{flow_finish} - $flowfields->{sys_uptime_ms})/1000;

$sql = sprintf("INSERT INTO flows (recv_time, agent_addr, protocol_id, src_addr, src_port, dst_addr, dst_port, packets, octets, flow_start, flow_finish) VALUES (to_timestamp('%s'), '%s', '%u', '%s', '%u', '%s', '%u', '%s', '%s', to_timestamp('%s'), to_timestamp('%s'))",
$recv_time,
$flowfields->{agent_addr},
$flowfields->{protocol},
$flowfields->{src_addr},
$flowfields->{src_port},
$flowfields->{dst_addr},
$flowfields->{dst_port},
$flowfields->{flow_packets},
$flowfields->{flow_octets},
$flow_start,
$flow_finish
);
$db->do($sql) or die "db->do failed: " . $DBI::errstr;
}
1;

=== cut here ===

The SQL schema:
CREATE TABLE flows (
id serial NOT NULL,
recv_time timestamp with time zone DEFAULT now() NOT NULL,
agent_addr inet NOT NULL,
protocol_id integer NOT NULL,
src_addr inet NOT NULL,
src_port integer NOT NULL,
dst_addr inet NOT NULL,
dst_port integer NOT NULL,
packets bigint DEFAULT 0 NOT NULL,
octets bigint DEFAULT 0 NOT NULL,
flow_start timestamp with time zone NOT NULL,
flow_finish timestamp with time zone NOT NULL
);