Sign In/My Account | View Cart  
advertisement


Listen Print Discuss

Massive Data Aggregation with Perl

by Fred Moyer
May 05, 2005

This article is a case study of the use of Perl and XML/RDF technologies to channel disparate sources of data into a semi-structured repository. This repository helped to build structured OLAP warehouses by mining an RDF repository with SAX machines. Channels of data included user-contributed datasets, data from FTP and HTTP remote-based repositories, and data from other intra-enterprise based assets. We called the system the 'Kitchen Sync', but one of the project's visionaries best described it as akin to a device that accepts piles of random coins and returns them sorted for analysis. This system collected voter data and was the primary data collection point in a national organization for the presidential campaign during the 2004 election.

Introduction

My initial question was why anyone would want to store data in XML/RDF formats. It's verbose, it lacks widely accepted query interfaces (such as SQL), and it generally requires more work than a database. XML, in particular, is a great messaging interface, but a poor persistence medium.

Eventually, I concluded that this particular implementation did benefit from the use of XML and RDF as messaging protocols. The messaging interface involved the use of SAX machines to parse a queue of XML and RDF files. The XML files contained the metadata for what we called polls, and the RDF files contained data from those polls. We had a very large buffer, from which cron-based processes frequently constructed data warehouses for analysis.

Related Reading

Perl and XML
By Erik T. Ray, Jason McIntosh

Hindsight and Realizations

The difficulty of this project was in the gathering of requirements and vendor interfacing. When implementing application workflow, it is critical to use a programming language that doesn't get in the way and allows you to do what you want--and that is where Perl really shined. A language that allows for quick development is an asset, especially in a rushed environment where projects are due "yesterday". The code samples here are not examples of how to write great object-oriented Perl code. They are real world examples of the code used to get things done in this project.

For example, when a voter-data vendor changed its poll format, our data collection spiders stopped returned data and alerted our staff immediately. In just minutes, we adapted our SAX machine to the vendor's new format and we had our data streams back up and running. It would have taken hours or days to call the vendor about the change and engage in a technical discussion to get them to do things our way. Instead, Perl allowed us to adapt to their ways quickly and efficiently.

Project Goals

The architects of this project specified several goals and metrics for the application. The main goals--with the penultimate objective being to accumulate as much data as possible before election day--were to:

  • Develop a web-based application for defining metadata of polls, and uploading sets of poll data to the system.

    The application had to give the user the ability to define sets of questions and answers known as polls. Poll metadata could contain related data contained in documents of standard business formats (.doc, .pdf). The users also needed an easy method, one that minimized possible errors, to upload data to the system.

  • Meet requirements of adding 50 million new records per day.

    That metric corresponds to approximately 578 records per second. Assuming a non-linear load distribution over time, peak transaction requirements were likely to be orders of magnitude higher than the average of 578 per second.

  • Develop a persistent store for RDF and XML data representing polls and poll data.

    The web application had to generate XML documents from poll definitions and RDF documents from uploaded poll data. We stored the poll data in RDF. We needed an API to manage these documents.

  • Develop a mechanized data collection system for the retrieval of data from FTP- and HTTP-based data repositories.

    The plan was to assimilate data sources into our organization from several commercial and other types of vendors. Most vendors had varying schemas and formats for their data. We wanted to acquire as much data as possible before the election to gauge voter support levels and other key metrics crucial to winning a political election.

Web Application

When I started this project, I had been using mod_perl2 extensively in prototyping applications and also as a means of finding all of the cool new features. Mod_perl2 had proven itself stable enough to use in production, so I implemented a Model-View-Controller application design pattern using a native mod_perl2 and an libapreq2-enabled Apache server. I adopted the controller design patterns from recipes in the Modperl Cookbook. The model classes subclassed Berkeley DBXML and XML::LibXML for object methods and persistence. We used Template Toolkit to implement views. (I will present more about the specifics of the persistence layer later in this article.)

Of primary importance with the web application component of the system was ease of use. If the system was not easy to use, then we would likely receive less data as a result of user frustration. The component of the web application that took extended transaction processing time was the poll data upload component.

If the user uploads a 10MB file on a 10Kbps upstream connection (common for residential DSL lines), the transaction would take approximately twenty minutes. On a 100Kbps upstream connection (business grade DSL), the transaction would take two minutes--certainly much longer than most unsuspecting users would wait before clicking on the browser refresh button.

To prevent the user from accidentally corrupting the lengthy upload process, I created a monitoring browser window which opened via the following JavaScript call when the user clicked the upload button.

<input type=submit name='submit' value='Upload'
    onClick="window.open('/ksync/dataset/monitor', 'Upload',
       'width=740,height=400')">

The server forked off a child process which read the upload status from a BerkeleyDB database. The parent process used a libapreq UPLOAD_HOOK-based approach to measure the amount of data uploaded, and to write that plus a few other metrics to the BerkeleyDB database. The following is a snippet of code from the upload handler:

<Location /ksync/poll/data/progress>
    PerlResponseHandler KSYNC::Apache::Data::Upload->progress
</Location>

sub progress : method {
    my ( $self, $r ) = @_;

    # We deal with commas and tabs as delimiters currently
    my $delimiter;

    # Create a BerkeleyDB to keep track of upload progress
    my $db = _init_status_db( DB_CREATE );

    # Get the specifics of the poll we're getting data for
    my $poll = $r->pnotes('SESSION')->{'poll'};

    # Generate a unique identifier for files based on the poll
    my $id = _file_id($poll);

    # Store any data which does not validate according to the poll schema
    my $invalid = IO::File->new();
    my $ivfn = join '', $config->get('data_root'), '/invalid/', $id, '.txt';
    $invalid->open("> $ivfn");

    # Set the rdf filename
    my $gfn = join '', $config->get('data_root'), '/valid/', $id, '.rdf';

    # Create an RDF document object to store the data
    my $rdf = KSYNC::Model::Poll::Data::RDF->new(
                $gfn, 
                $poll,
                $r->pnotes('SESSION')->{'creator'}, 
                DateTime->now->ymd, 
    );

    # Get the poll questions for to make sure the answers are valid
    my $questions = $poll->questions;

    # Create a data structure to hold the answers to validate against.
    my @valid_answers = _valid_answers($questions);

    # And a data structure to hold the validation results
    my $question_data = KSYNC::Model::Poll::validation_results($questions);

    # Set progress store parameters
    my $length              = 0;
    my $good_lines_total    = 0;
    my $invalid_lines_total = 0;
    my $began;              # Boolean to determine if we've started parsing data
    my $li                  = 1;    # Starting line number

    # The subroutine to process uploaded data
    my $fragment;
    my $upload_hook = sub {
        my ( $upload, $data, $data_len, $hook_data ) = @_;

        if ( !$began ) {   # If this is the first set check the array length

            # Chop up the stream
            my @lines = split "\n", $data;

            # Determine the delimiter for this line
            $delimiter = _delimiter(@lines);

            unless ( ( split( /$delimiter/, $lines[0] ) ) ==
                scalar( @{$question_data} ) + 1 )
            {
                $db->db_put( 'done', '1' );
                
                # The dataset isn't valid, so throw an exception
                KSYNC::Apache::Exception->throw('Invalid Dataset!');
            }
        }

        # Mark the start up the upload
        $began = 1;

        # Validate the data against the poll answers we've defined
        my ( $good_lines, $invalid_lines );

        ( $good_lines, $invalid_lines, $question_data, $li, $fragment ) =
          KSYNC::Model::Poll::Data::validate( \@valid_answers, 
                                              $data, 
                                              $question_data,
                                              $li, 
                                              $delimiter, 
                                              $fragment );

        # Keep up the running count of good and invalid lines
        $good_lines_total     += scalar( @{$good_lines} );
        $invalid_lines_total  += scalar( @{$invalid_lines} );

        # Increment the number of bytes processed
        $length += length($data);

        # Update the status for the monitor process
        $db->db_put(
                     valid     => $good_lines_total,
                     invalid   => $invalid_lines_total,
                     bytes     => $length,
                     filename  => $upload->filename,
                     filetype  => $upload->type,
                     questions => $question_data,
                   );

        # And store the data we've collected
        $rdf->write( $good_lines ) if scalar( @{$good_lines} );

        # Write out any invalid data points to a separate file
        _write_txt( $invalid, $invalid_lines ) if scalar( @{$invalid_lines} );
    };

    my $req = Apache::Request->new(
        $r,
        POST_MAX    => 1024 * 1024 * 1024,    # One Gigabyte
        HOOK_DATA   => 'Note',
        UPLOAD_HOOK => $upload_hook,
        TEMP_DIR    => $config->get('temp_dir'),
    );

    my $upload = eval { $req->upload( scalar +( $req->upload )[0] ) };
    if ( ref $@ and $@->isa("Apache::Request::Error") ) {

        # ... handle Apache::Request::Error object in $@
        $r->headers_out->set( Location => 'https://'
              . $r->construct_server
              . '/ksync/poll/data/upload/aborted' );
        return Apache::REDIRECT;
    }

    # Finish up
    $invalid->close;
    $rdf->save;

    # Set status so the progress window will close
    $db->db_put('done', 1');
    undef $db;
    
    # Send the user to the summary page
    $r->headers_out->set(
      Location => join('', 
                       'https://', 
                       $r->construct_server, 
                       '/poll/data/upload/summary',
                      )                   
    );
    return Apache::REDIRECT; 
}

During the upload process, the users saw a status window which refreshed every two seconds and had a pleasant animated GIF to enhance their experience, as well as several metrics on the status of the upload. One user uploaded a file that took 45 minutes because of a degraded network connection, but the uploaded file had no errors.

The system converted CSV files that users uploaded into RDF and saved them to the RDF store during the upload process. Because of the use of the UPLOAD_HOOK approach for processing uploaded data, the mod_perl-enabled Apache processes never grew in size or leaked memory as a result of handling the upload content.

Pages: 1, 2, 3

Next Pagearrow