Sign In/My Account | View Cart  
advertisement


Listen Print Discuss

Massive Data Aggregation with Perl
by Fred Moyer | Pages: 1, 2, 3

Poll and Poll Data Stores

Several parties involved raised questions about the use of XML and RDF as persistence mediums. Why not use a relational database? Our primary reasons for deciding against a relational database were that we had several different schemas and formats of incoming data, and we needed to be able to absorb huge influxes of data in very short time periods.

Consider how a relational database could have handled the variation in schemas and formats. Creating vendor-specific drivers to handle each format would have been straightforward. To handle the variations in schema, we could have normalized each data stream and its attributes so that we could store all the data in source, object, attribute, and value tables. The problem with that approach is that you get one really big table with all the values, which becomes more difficult to manage as time goes on. Another possible approach, which I have used in the past, is to create separate tables for each data stream to fit the schema, and then use the power of left, right, and outer joins to extract the needed information. It scales much better than the first approach but it is not as well suited for data mining as warehouses are.

With regard to absorbing a lot of data very quickly, transactional relational databases have limitations when you insert or update data in a table with many rows. Additionally, the insert and update transactions are not asynchronous. When inserting or updating a record, the transaction will not complete until the indexes associated with the indexed fields of that record have updated. This slows down as the database grows in size.

We wanted the transactions between users, machines, and the Kitchen Sync to be as asynchronous as possible. Our ability to take in data in RDF format would not degrade with increasing amounts of data already taken in before warehousing for analysis. Data exchange challenges between vendors and us included a few large transactions in RDF format per data set, and how the length of the transaction time depended solely on the speed of the network connection between the vendor and our data center.

With the decision to use XML for storing poll metadata and RDF for storing poll data in place, we turned our attention to the specifics of the persistence layer. We stored the poll objects in XML, as shown in this example:


<?xml version="1.0"?>
<poll>
    <creator>Fred Moyer</creator>
    <date>2005-03-01</date>
    <vendor>Voter Data Inc.</vendor>
    <location>https://www.voterdatainc.com/poll/1234</location>
    <questions>
        <question>
            <name>Who is buried in Grant's Tomb?</name>
            <answers>
                <answer>
                    <name>Ulysses Grant</name>
                    <value>0</value>
                </answer>
                <answer>
                    <name>John Kerry</name>
                    <value>1</value>
                </answer>
                <answer>
                    <name>George Bush</name>
                    <value>2</value>
                </answer>
                <answer>
                    <name>Alfred E.  Neumann</name>
                    <value>3</name>
                </answer>
            </answers>
        </question>
    </questions>
    <media>
        <pdf>
            <name>Name of a PDF file describing this poll</name>
            <raw>The raw contents of the PDF file</raw>
            <text>The text of the PDF file, generated with XPDF libs</text>
        </pdf>
    </media>
</poll>

We also needed an API to manage those documents. We chose Berkeley DBXML because of its simple but effective API and its ability to scale to terabyte size if needed. We created a poll class which subclassed the Sleepycat and XML::LibXML modules and provided some Perlish methods for manipulating polls.

package KSYNC::Model::Poll;

use strict;
use warnings;

use base qw(KSYNC::Model);
use SleepyCat::DbXml qw(simple);
use XML::LibXML;
use KSYNC::Exception;

my $ACTIVITY_LOC = 'data/poll.dbxml';

BEGIN {
    # Initialize the DbXml database
    my $container = XmlContainer->new($ACTIVITY_LOC);
}

# Call base class constructor KSYNC::Model->new
sub new {
    my ($class, %args) = @_;

    my $self = $class->SUPER::new(%args);
    return $self;
}

# Transform the poll object into an xml document
sub as_xml {
    my ($self, $id) = @_;
    
    my $dom = XML::LibXML::Document->new();
    my $pi = $dom->createPI( 'xml-styleshet', 
                             'href="/css/poll.xsl" type="text/xsl"' );
    $dom->appendChild($pi);
    my $element = XML::LibXML::Element->new('Poll');

    $element->appendTextChild('Type',        $self->type);
    $element->appendTextChild('Creator',     $self->creator);
    $element->appendTextChild('Description', $self->description);
    $element->appendTextChild('Vendor',      $self->vendor);
    $element->appendTextChild('Began',       $self->began);
    $element->appendTextChild('Completed',   $self->completed);

    my $questions = XML::LibXML::Element->new('Questions');

    for my $question ( @{ $self->{question} } ) {
        $questions->appendChild($question->as_element);
    }

    $element->appendChild($questions);

    $dom->setDocumentElement($element);
    return $dom;
}

sub save {
    my $self = shift;

    # Connect to the DbXml databae
    $container->open(Db::DB_CREATE);

    # Create a new document for storage from xml serialization of $self
    my $doc = XmlDocument->new();
    $doc->setContent($self->as_xml);
    
    # Save, throw an exception if problems happen
    eval { $container->putDocument($doc); };
    KSYNC::Exception->throw("Could not add document: $@") if $@;

    # Return the ID of the newly added document
    return $doc->getID();
}

We chose RDF as the format for poll data because the format contains links to resources that describe the namespaces of the document, making the document self-describing. The availability of standardized namespaces such as Dublin Core gave us predefined tags such as dc:date and dc:creator. We added our own namespaces for representation of poll data. Depending on what verbosity of data the vendors kept, we could add dc:date tags to different portions of the document to provide historical references. We constructed our URLs in a REST format for all web-based resources.

<?xml version="1.0" encoding="UTF-8"?>
<rdf:RDF
	xmlns:RDF="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
        xmlns:dc="http://purl.org/dc/elements/1.1/"
	xmlns:ourparty="http://www.ourparty.org/xml/schema#">
	
	<rdf:Description rdf:about="http://www.ourparty.org/poll/1234">
	    <dc:date>2004-10-14</dc:date>
            <dc:creator>fmoyer@plusthree.com</dc:creator>
        </rdf:Description>
        
        <rdf:Bag>
        <rdf:li ourparty:id="6372095736" ourparty:question="1"
		    ourparty:answer="1" dc:date="2005-03-01" />
        <rdf:li ourparty:id="2420080069" ourparty:question="2"
            ourparty:answer="3" dc:date="2005-03-02" />
	</rdf:Bag>
</rdf:RDF>

We used SAX machines as drivers to generate summary models of RDF files and LibXML streaming parsers to traverse the RDF files. We stacked drivers by using pipelined SAX machines and constructed SAX drivers for the different vendor data schemas. Cron-based machines scanned the RDF store, identified new poll data, and processed them into summary XML documents which we served to administrative users via XSLT transformations. Additionally, we used the SAX machines to create denormalized SQL warehouses for data mining.

An example SAX driver for Voter Data, Inc. RDF poll data:

package KSYNC::SAX::Voterdatainc;

use strict;
use warnings;

use base qw(KSYNC::SAX);

my %NS = (
    rdf      => 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
    dc       => 'http://purl.org/dc/elements/1.1/',
    ourparty => 'http://www.ourparty.org/xml/schema#',
);

my $VENDOR = 'Voter Data, Inc.';

sub new {
    my $class = shift;

    # Call the super constructor to create the driver
    my $self = $class->SUPER::new(@_, { vendor => $VENDOR });

    return $self;
}

sub start_element {
    my ($self, $data) = @_;
    
    # Process rdf:li elements
    if ( $data->{Name} eq 'rdf:li' ) {
    
        # Grab the data
        my $id      = $data->{Attributes}{ "{$NS{ourparty}}id" }{Value};
        my $answer  = $data->{Attributes}{ "{$NS{ourparty}}answer" }{Value};
        my $creator = $data->{Attributes}{ "{$NS{dc}}creator" }{Value};
        my $date    = $data->{Attributes}{ "{$NS{dc}}date" }{Value};

        # Map the data to a common response
        $self->add_response({ vendor        => $VENDOR,
                              voter_id      => $id, 
                              support_level => $answer, 
                              creator       => $creator,
                              date          => $date,
                           });

        # Call the base class start_element method to do something with the data
        $self->SUPER::start_element($data);
}

1;

We stored RDF documents compressed in bzip2 format, because bzip2 compression algorithm is especially efficient at compressing repeating element data. As shown below in the SAX machine example, using bzcat as the intake to a pipeline parser allowed decompression of the bzip2 documents for parsing and creating a summary of a poll data set.

#!/usr/bin/env perl

use strict;
use warnings;

use KSYNC::SAX::Voterdatainc;
use XML::SAX::Machines qw(Pipeline);

# The poll data
my $rdf = 'data/voterdatainc/1759265.rdf.bz2';

# Create a vendor specific driver
my $driver = KSYNC::SAX::Voterdatainc->new();

# Create a driver to add the data to a data warehouse handle
my $dbh = KSYNC::DBI->connect();
my $warehouser = KSYNC::SAX::DBI->new(
                    source => 'http://www.voterdatainc/ourparty/poll.xml',
                    dbh    => $dbh,
                );

# Create a parser which uncompresses the poll data set, summarizes it, and 
# outputs data to a filter which warehouses the denormalized data
my $parser = Pipeline(
                "bzcat $rdf |" =>
                $driver        => 
                $warehouser    => 
;

# Parse the poll data
$parser->parse();

# Summarize the poll data
print "Average support level:  ",   $driver->average_support_level, "\n";
print "Starting date:  ", 	    $driver->minimum_date, "\n";
print "Ending date:  ", 	    $driver->maximum_date, "\n";

Between the polls, the XML Schema dictionaries, and the RDF files, we know who the polls contacted, what they saw, and how they responded. A major benefit of keeping the collected information in RDF format is the preservation of historical information. We constructed SQL warehouses to analyze changes in voter support levels over time. This was critical for measuring the effect of events such as presidential debates on voter interest and support.

Using RDF also provided us with the flexibility to map new data sources as needed. If a vendor collected some information which we had not processed before, they would add an about tag such as <rdf:Description rdf:about="http://www.datavendor.com/ourparty/poll5.xml" /> , which we would map to features of our SAX machines as needed.

We added some hooks to the SAX machines to match certain URIs and then process selected element data. Late in the campaign, when early voting started, we were able to quickly modify our existing SAX machines to collect early voting data from the data streams and produce SQL warehouses for analysis.

Pages: 1, 2, 3

Next Pagearrow