Search This Blog

Showing posts with label netflow flowd perl logsock postgresql freebsd. Show all posts
Showing posts with label netflow flowd perl logsock postgresql freebsd. Show all posts

Thursday, November 26, 2009

Get NetFlow entries from a flowd via logsock to put to PostgreSQL in Perl

Here is a small piece of code in Perl to get NetFlow entries from a flowd via logging socket (logsock) and put to a PostgreSQL database.
Beware, no sanity checking at all!!!

=== cut here ===

#!/usr/bin/perl
use IO::Socket;
use Socket;
use Flowd;
use DBI;

# Database settings
my $DBI_DRIVER = "Pg"; # or one of "Pg" "mysql" "mysqlPP"
my $DB = "netflow";
my $HOST = "localhost";
my $TABLE = "flow";
my $USER = "netflow";
my $PASS = "password";

$sock_addr="/var/run/flowd/flowd.sock";
unlink($sock_addr);

$sock = IO::Socket::UNIX->new( Local => $sock_addr, Type => SOCK_DGRAM)
or die "Can't bind to Unix Socket: $!\n";
$sock->setsockopt(SOL_SOCKET, SO_RCVBUF, 65440);

my $db = DBI->connect("dbi:$DBI_DRIVER:host=$HOST;database=$DB", $USER, $PASS)
or die "DBI->connect error: " . $DBI::errstr;

print "Started.\n";
while ($bytes = $sock->recv($input,1024)) {
$flowfields = Flowd::deserialise($input);

$recv_time = sprintf "%s.%03d",$flowfields->{recv_sec}, $flowfields->{recv_usec};
$flow_start = $recv_time + ($flowfields->{flow_start} - $flowfields->{sys_uptime_ms})/1000;
$flow_finish = $recv_time + ($flowfields->{flow_finish} - $flowfields->{sys_uptime_ms})/1000;

$sql = sprintf("INSERT INTO flows (recv_time, agent_addr, protocol_id, src_addr, src_port, dst_addr, dst_port, packets, octets, flow_start, flow_finish) VALUES (to_timestamp('%s'), '%s', '%u', '%s', '%u', '%s', '%u', '%s', '%s', to_timestamp('%s'), to_timestamp('%s'))",
$recv_time,
$flowfields->{agent_addr},
$flowfields->{protocol},
$flowfields->{src_addr},
$flowfields->{src_port},
$flowfields->{dst_addr},
$flowfields->{dst_port},
$flowfields->{flow_packets},
$flowfields->{flow_octets},
$flow_start,
$flow_finish
);
$db->do($sql) or die "db->do failed: " . $DBI::errstr;
}
1;

=== cut here ===

The SQL schema:
CREATE TABLE flows (
id serial NOT NULL,
recv_time timestamp with time zone DEFAULT now() NOT NULL,
agent_addr inet NOT NULL,
protocol_id integer NOT NULL,
src_addr inet NOT NULL,
src_port integer NOT NULL,
dst_addr inet NOT NULL,
dst_port integer NOT NULL,
packets bigint DEFAULT 0 NOT NULL,
octets bigint DEFAULT 0 NOT NULL,
flow_start timestamp with time zone NOT NULL,
flow_finish timestamp with time zone NOT NULL
);