Here is a small piece of code in Perl to get NetFlow entries from a flowd via logging socket (logsock) and put to a PostgreSQL database.
Beware, no sanity checking at all!!!
=== cut here ===
#!/usr/bin/perl
use IO::Socket;
use Socket;
use Flowd;
use DBI;
# Database settings
my $DBI_DRIVER =        "Pg"; # or one of "Pg" "mysql" "mysqlPP"
my $DB =                "netflow";
my $HOST =              "localhost";
my $TABLE =             "flow";
my $USER =              "netflow";
my $PASS =              "password";
$sock_addr="/var/run/flowd/flowd.sock";
unlink($sock_addr);
$sock = IO::Socket::UNIX->new( Local => $sock_addr, Type => SOCK_DGRAM)
        or die "Can't bind to Unix Socket: $!\n";
$sock->setsockopt(SOL_SOCKET, SO_RCVBUF, 65440);
my $db = DBI->connect("dbi:$DBI_DRIVER:host=$HOST;database=$DB", $USER, $PASS)
        or die "DBI->connect error: " . $DBI::errstr;
print "Started.\n";
while ($bytes = $sock->recv($input,1024)) {
        $flowfields = Flowd::deserialise($input);
        $recv_time = sprintf "%s.%03d",$flowfields->{recv_sec}, $flowfields->{recv_usec};
        $flow_start = $recv_time + ($flowfields->{flow_start} - $flowfields->{sys_uptime_ms})/1000;
        $flow_finish = $recv_time + ($flowfields->{flow_finish} - $flowfields->{sys_uptime_ms})/1000;
        $sql = sprintf("INSERT INTO flows (recv_time, agent_addr, protocol_id, src_addr, src_port, dst_addr, dst_port, packets, octets, flow_start, flow_finish) VALUES (to_timestamp('%s'), '%s', '%u', '%s', '%u', '%s', '%u', '%s', '%s', to_timestamp('%s'), to_timestamp('%s'))",
                $recv_time,
                $flowfields->{agent_addr},
                $flowfields->{protocol},
                $flowfields->{src_addr},
                $flowfields->{src_port},
                $flowfields->{dst_addr},
                $flowfields->{dst_port},
                $flowfields->{flow_packets},
                $flowfields->{flow_octets},
                $flow_start,
                $flow_finish
        );
        $db->do($sql) or die "db->do failed: " . $DBI::errstr;
}
1;
=== cut here ===
The SQL schema:
CREATE TABLE flows (
    id serial NOT NULL,
    recv_time timestamp with time zone DEFAULT now() NOT NULL,
    agent_addr inet NOT NULL,
    protocol_id integer NOT NULL,
    src_addr inet NOT NULL,
    src_port integer NOT NULL,
    dst_addr inet NOT NULL,
    dst_port integer NOT NULL,
    packets bigint DEFAULT 0 NOT NULL,
    octets bigint DEFAULT 0 NOT NULL,
    flow_start timestamp with time zone NOT NULL,
    flow_finish timestamp with time zone NOT NULL
);
Subscribe to:
Comments (Atom)