How to Automate the Fetching Process


The Fetching process is one of the most essential processes for a production search engine. Automation of that process is also essential. This brief document will cover the JobStream.py python script that is used to automate the fetching process including fetching, updating the crawl database, and merging fetches into single segments. Please note that it is assumed that storage of the fetches is occuring on the Hadoop DFS (although the script could be altered to run on the local file system).

The JobStream.py Process

JobStream.py is a single class python script that automates the webpage fetching / update process.

The job starts by dumping the crawl database to a local disk from the master crawl database in the distributed file system (dfs). The dump files are then parsed to extract only unfetched urls which are appended to a single output file. Currently this is setup for only http urls. This url file is then split into multiple runs of x number of pages each. This is performed in a temp folder on the local file system. Each run is then consecutively loaded into the dfs, and run through the inject, generate, fetch, updatedb, and readdb commands. This happens in a temp folder on the dfs. There are options to setup x number of fetch runs before merging occurs.

The master and temp crawldb and segments folders are then merged into a new folder and then temp folders on the dfs as well as the temp folders on the local file system are deleted. The master folder is renamed to crawl-backup thereby giving us a single backup of previous data. This backup folder is removed and replaced for each url split.

The process removes the local url load directory before moving on to the next url split. The script finishes when all url splits have been run. There are also options to stop the script after the current fetch-merge run. You can then resume from that point at a later time.

The JobStream.py script automates only the fetching, updating, and merging processes. Inverting links and indexing are not covered yet although development of a more complete automation framework is underway.

The JobStream Options

To start using the JobStream file you will probably want to set some configuration variables in the main def of the script. If you prefer you can override most of these options on the command line. This is at the bottom of the script staring on line 374 and looks like this:

"""
Main method that starts up the script.
"""
def main(argv):

  # set the default values
  resume = 0
  execute = 0
  checkfile = "jobstream.stop"
  logconf = "logging.conf"
  jobdir = "/d01/jobstream"
  nutchdir = "/d01/revolution"
  masterdir = "crawl"
  backupdir = "crawl-backup"
  dfsdumpdir = "crawldump"
  tempdir = "crawltemp"
  splitsize = 500000
  fetchmerge = 3

Th checkfile variable is the name of the stop file to check for. If this file is present in the same directory as the JobStream.py script, the script will stop executing after the current fetch-update-merge run.

The logconf variable is the name of the logging file. You can configure logging for this script in a logging conf file that is in the same directory as the JobStream script. A sample logging file is provided later. Logging is set to go to the console by default.

The jobdir variable is the path of the local job directory. The one that the JobStream script live in and execute out of.

The nutchdir variable is the path to the base of the local nutch installation.

The masterdir variable is the path to the master directory on the dfs. This is the dfs directory that holds the crawldb and segments. The is also what the temp directory that holds the operations on the dfs will be named after the merge processes are finished and the old master directory is backed up.

The backupdir variable is the path to the backup directory on the dfs. This is where the old master directory will be moved to when the newly fetched and merged master is complete.

The dfsdump variable is the location on the dfs where the crawl database will be dumped to before being moved to the local file system for processing. This is deleted as soon as the files are moved to local.

The tempdir variable is not yet implemented but will be the location on the dfs where the temporary fetching and merging operations will occur. For right now this is hardcoded as crawltemp in the users directory on the dfs.

The splitsize variable is the number of urls to fetch in each load. The default is to fetch 500,000 urls in each load.

The fetchmerge variable is the number of fetches to run before merging. We want to keep the database up to date but we also don't want to waste processing time. By having more fetches per merge, the merge process can merge multiple segments and crawl database in a single execution instead of once per fetch. You can set this to 1 if you want to update once per fetch but the entire process will just take longer to complete. It is recommended to set this to between 3 and 5 depending on your bandwidth, processing power, and the size of your crawl databases and segments. The default is three fetches merged after the third fetch is complete.

Below is an example of the JobStream help screen. You can get to this screen at any point in time by using the -h or --help options.

JobStream.py [-hrjnlsmc]
             [-e | --execute] runs the jobstream.
             [-h | --help] prints this help and usage message.
             [-r | --resume] to resume a previous run.
             [-l | --logconf] Overrides logging conf file [logging.conf].
             [-j | --jobdir] The job directory, [/d01/jobstream].
             [-n | --nutchdir] The nutch home, [/d01/revolution].
             [-m | --masterdir] The dfs master directory [crawl].
             [-b | --backupdir] The master backup directory, [crawl-backup].
             [-s | --splitsize] The number of urls per load [500000].
             [-f | --fetchmerge] The number of fetches to run before merge [1].

The -e or --execute options run the jobstream. Use these option if you are starting a new jobstream process. If you are resuming an old process that you stopped before completion, use the -r or --resume options.

The rest of the options are pretty self explanatory and simply override the options that are set in the main def of the script. To start up a jobstream that will fetch 5 runs of 1,000,000 urls each before merging you would use a command like this.

./JobStream.py -s 1000000 -f 5
or alternatively
./JobStream.py --splitsize=1000000 --fetchmerge=5

This assumes that you have set basic options in the main def of the script and are overriding basic options.

The JobStream.py Script

#!/usr/bin/python

import sys
import getopt
import re
import logging
import logging.config
import commands
import os
import os.path

"""
-----------------------------------------------------------------------------
The JobStream class automates the webpage fetching / update process.

The job starts by dumping the crawl database to a local disk from the master
crawl database in the distributed file system (dfs).  The dump files are then
parsed to extract only unfetched urls which are appended to a single output
file.  This url file is then split into multiple runs of x number of pages
each.  Each run is then consecutively loaded into the dfs, and run through
the inject, generate, fetch, updatedb, and readdb commands.  This happens in
a temp folder.

The master and temp crawldb and segments folders are then merged into a new
folder and then temp folders are deleted.  The master folder is renamed to
crawl-backup thereby giving us a single backup of previous data.  This backup
folder is removed and replaced for each url split.

The process removes the local url load directory before moving on to the next
url split.  The script finishes when all url splits have been run.

Use the -h or the --help flag to get a listing of options.

Program: JobStream Automation
Author: Dennis E. Kubes
Date: December 12, 2006
Revision: 1.2

Revision      | Author            | Comment
-----------------------------------------------------------------------------
20060906-1.0  Dennis E. Kubes     Initial creation of JobStream script.
20060907-1.1                      Added command line options for configuration,
                                  added comments, fixed bugs, added resume and
                                  stop after current url run completes.
20061205-1.2                      Added command line options and logic to run
                                  multiple fetches before merging.  This is to
                                  improve performance by not having to merge
                                  as often.
-----------------------------------------------------------------------------
TODO: Add dump and temp directory configuration options
-----------------------------------------------------------------------------
"""

class JobStream:

  nutchdir = ""
  masterdir = ""
  backupdir = ""
  log = logging.getLogger("jobstream")

  """
  Constructor for the JobStream class.  Passes in the nutch home directory and
  the nutch master directory
  """
  def __init__(self, nutchdir, masterdir, backupdir):

    # set the nutch directory home (where the bin directory for running the
    # nutch commands is), and the master crawl directory on the dfs
    self.nutchdir = nutchdir
    self.masterdir = masterdir
    self.backupdir = backupdir

  """
  Checks the status of result codes in the returned result array and raises an
  error if the result code is not a successful exit.
  """
  def checkStatus(self, result, err):
   if result[0] != 0:
      raise err + " " + result[1]

  """
  Dumps the master crawl database first to the dfs at the dfsdir location and
  then copies to the local file system at localdir.  The dfsdir is removed
  once the dump is copied to the local filesystem.
  """
  def dumpCrawlDb(self, dfsdir, localdir):

    # create the nutch commands
    crawldb = self.masterdir + "/crawldb"
    dump = self.nutchdir + "/bin/nutch readdb " + crawldb + " -dump " + dfsdir
    copylocal = self.nutchdir + "/bin/hadoop dfs -copyToLocal " + dfsdir + " " + localdir
    deletetemp = self.nutchdir + "/bin/hadoop dfs -rm " + dfsdir

    # execute the nutch commands and get each exit code, if it exited
    # with an error raise an exception
    nutchcmds = (dump, copylocal, deletetemp)
    for curcmd in nutchcmds:
      self.log.info("Running: " + curcmd)
      result = commands.getstatusoutput(curcmd)
      self.checkStatus(result, "Error occurred while running command " + curcmd)

  """
  Parses the crawl dump files on the local filesystem for unfetched urls and
  appends all of the unfetched urls to a single output file.
  """
  def parseCrawlDump(self, indir, outfile):

    # open the output file in append mode
    outhandle = open(outfile, "a")

    # loop through each file in the dump directory but only get the
    # part-xxxxx files because there are .crc file in the same directory
    for dumpfile in os.listdir(indir):

      if dumpfile[0:4] == "part":

        curfile = indir + "/" + dumpfile
        self.log.info("Processing dump file: " + curfile)

        # set the input and output files
        inhandle = open(curfile, "r")

        # setup the regular expressions for searching and matching
        validUrl = "^http://(?! )(\w+[:]?){2,}(/?|[^ \n\r\"]+[\w/])(?=[\s\.,)\'\"\]])"
        urlRegex = re.compile(validUrl)
        unfetchedRegex = re.compile("DB_unfetched")

        # loop over the file, and match lines that are valid http:// urls where the
        # next line says that it is unfetched.  Write those lines to the outfile
        prevline = ""
        for line in inhandle:
          if urlRegex.search(line):
            prevline = line
          elif unfetchedRegex.search(line) and prevline != "":
            fields = prevline.strip("\n").split("\t")
            url = fields[0]
            prevline = ""
            outhandle.write(url + "\n")

        # close the file connections
        inhandle.close()

    # close the output file
    outhandle.close()

  """
  Creates the url splits.  Each split will be a file that contains the number of
  splitsize urls.  Each split is in its own number directory with a file in the
  directory named urls that contains the actual urls to be fetched.
  """
  def createUrlLoads(self, splitsize, urllist, outdir):

    # if the directory that holds the url loads doesn't exists, create it
    if not os.path.isdir(outdir):
      os.mkdir(outdir)

    # determine the total number of urls in the file and create a padding string
    # used to name the output folders correctly
    total_urls = 0
    urllinecount = open(urllist, "r")
    for line in urllinecount:
      total_urls += 1
    urllinecount.close()
    numsplits = total_urls / splitsize
    padding = "0" * len(repr(numsplits))

    # create the url load folder
    filenum = 0
    strfilenum = repr(filenum)
    urloutdir = outdir + "/urls-" + padding[len(strfilenum):] + strfilenum
    os.mkdir(urloutdir)
    urlfile = urloutdir + "/urls"

    # open the input and output files
    self.log.info("Creating load file: " + urlfile)
    inhandle = open(urllist, "r")
    outhandle = open(urlfile, "w")

    # loop through the file
    for linenum, line in enumerate(inhandle):

      # if we have come to a split then close the current file, create a new
      # url folder and open a new url file
      if linenum > 0 and linenum % splitsize == 0:

        filenum += 1
        strfilenum = repr(filenum)
        urloutdir = outdir + "/urls-" + padding[len(strfilenum):] + strfilenum
        os.mkdir(urloutdir)
        urlfile = urloutdir + "/urls"
        self.log.info("Creating load file: " + urlfile)
        outhandle.close()
        outhandle = open(urlfile, "w")

      # write the url to the file
      outhandle.write(line)

    # close the input and output files
    inhandle.close()
    outhandle.close()

  """
  Runs all of the url loads.  For each of the url load directories it loads the
  url file into the dfs and then runs the inject, generate, fetch, readdb, and
  updatedb commands.  This temp load is then merged with the master database to
  create a new database.  The old database is stored as backup, the new database
  takes the place as the master database, and the temp database is removed.

  At the end of each url load the process will check to see if a stopfile is
  present on the system.  If the file is present, this indicates to the process
  to stop running.  The stop file will be removed and the process will terminate.
  The process can be restarted later with the -r or --resume flags and it will
  pick up where it left off on the next url load.
  """
  def runFetchMerges(self, urllistdir, stopfile, fetchmerges):

    # get the folders for the url directory and sort them, this is reason the
    # folders needs to be named correctly, so they are loaded in order
    urldirs = os.listdir(urllistdir)
    urldirs.sort()
    counter = 0
    numdirs = len(urldirs)

    # variables for the crawldb and segments directories on the dfs
    mastercrawldbdir = self.masterdir + "/crawldb"
    mastersegsdir = self.masterdir + "/segments"

    # for each of the url loads
    while counter < numdirs:

      # list to hold individual fetches and segments
      tempseglist = []
      tempdblist = []

      # run the number of fetches before merging, this allows us to improve
      # performance by not having to merge as often
      for curl in urldirs[counter:counter + fetchmerges]:

        # set the current load directory
        curloaddir = urllistdir + "/" + curl
        self.log.info("Starting current load: " + curloaddir)

        # create the nutch commands for load, inject, generate
        tempdb = "crawltemp/crawldb" + str(counter)
        tempdblist.append(tempdb)
        load = self.nutchdir + "/bin/hadoop dfs -put " + curloaddir + " crawltemp/urls" + str(counter)
        inject = self.nutchdir + "/bin/nutch inject " + tempdb + " crawltemp/urls" + str(counter)
        generate = self.nutchdir + "/bin/nutch generate " + tempdb + " crawltemp/segments" + str(counter)

        # run the load, inject, and generate commands, check the results, if bad exit
        nutchcmds = (load, inject, generate)
        for curcmd in nutchcmds:
          self.log.info("Running: " + curcmd)
          result = commands.getstatusoutput(curcmd)
          self.checkStatus(result, "Error occurred while running command" + curcmd)

        # get the current segment to fetch
        self.log.info("Getting segment to fetch.")
        getsegment = self.nutchdir + "/bin/hadoop dfs -ls crawltemp/segments" + str(counter)
        self.log.info("Running: " + getsegment)
        result = commands.getstatusoutput(getsegment)
        self.checkStatus(result, "Error occurred while running command" + getsegment)

        # fetch the current segment
        outar = result[1].splitlines()
        output = outar[-1]
        tempseg = output.split()[0]
        tempseglist.append(tempseg)
        fetch = self.nutchdir + "/bin/nutch fetch " + tempseg
        self.log.info("Starting fetch for: " + tempseg)
        self.log.info("Running: " + fetch)
        result = commands.getstatusoutput(fetch)
        self.checkStatus(result, "Error occurred while running command" + fetch)
        self.log.info("Finished fetch for: " + tempseg)

        # update the crawldb from the current segment
        self.log.info("Updating " + tempdb + " from " + tempseg + ".")
        updatetemp = self.nutchdir + "/bin/nutch updatedb " + tempdb + " " + tempseg
        self.log.info("Running: " + updatetemp)
        result = commands.getstatusoutput(updatetemp)
        self.checkStatus(result, "Error occurred while running command" + updatetemp)

         # remove the current url load directory
        self.log.info("Removing current local load directory: " + curloaddir)
        os.remove(curloaddir + "/urls")
        os.rmdir(curloaddir)

        # log the current url finished
        self.log.info("Finished current load: " + curloaddir)

        #increment the counter
        counter += 1

      # merge the crawldbs
      self.log.info("Merging master and temp crawldbs.")
      crawlmerge = self.nutchdir + "/bin/nutch mergedb mergetemp/crawldb " + \
        mastercrawldbdir + " " + " ".join(tempdblist)
      self.log.info("Running: " + crawlmerge)
      result = commands.getstatusoutput(crawlmerge)
      self.checkStatus(result, "Error occurred while running command" + crawlmerge)

      # merge the segments
      self.log.info("Merging master and temp segments")
      getsegment = self.nutchdir + "/bin/hadoop dfs -ls " + mastersegsdir
      result = commands.getstatusoutput(getsegment)
      self.checkStatus(result, "Error occurred while running command" + getsegment)
      outar = result[1].splitlines()
      output = outar[-1]
      masterseg = output.split()[0]
      mergesegs = self.nutchdir + "/bin/nutch mergesegs mergetemp/segments " + \
        masterseg + " " + " ".join(tempseglist)
      self.log.info("Running: " + mergesegs)
      result = commands.getstatusoutput(mergesegs)
      self.checkStatus(result, "Error occurred while running command" + mergesegs)

      # back up the master, rename the merged to master, and remove the temp
      self.log.info("Backing up, deleting, and renaming of merge resources")
      rmoldback = self.nutchdir + "/bin/hadoop dfs -rm " + self.backupdir
      self.log.info("Running: " + rmoldback)
      result = commands.getstatusoutput(rmoldback)
      self.checkStatus(result, "Error occurred while running command" + rmoldback)
      masterback = self.nutchdir + "/bin/hadoop dfs -mv " + self.masterdir + " " + self.backupdir
      self.log.info("Running: " + masterback)
      result = commands.getstatusoutput(masterback)
      self.checkStatus(result, "Error occurred while running command" + masterback)
      mergemove = self.nutchdir + "/bin/hadoop dfs -mv mergetemp " + self.masterdir
      self.log.info("Running: " + mergemove)
      result = commands.getstatusoutput(mergemove)
      self.checkStatus(result, "Error occurred while running command" + mergemove)
      deltemp = self.nutchdir + "/bin/hadoop dfs -rm crawltemp"
      self.log.info("Running: " + deltemp)
      result = commands.getstatusoutput(deltemp)
      self.checkStatus(result, "Error occurred while running command" + deltemp)

      # read the current master database stats and put the output into the log
      self.log.info("Reading database statistics")
      dbname =  self.masterdir + "/crawldb"
      readdb = self.nutchdir + "/bin/nutch readdb " + dbname + " -stats"
      self.log.info("Running: " + readdb)
      result = commands.getstatusoutput(readdb)
      self.checkStatus(result, "Error occurred while running command" + readdb)
      output = result[1]
      self.log.info(output)

      # check to see if the stop file is present, if it is remove the stopfile
      # and exit the script successfully
      if os.path.isfile(stopfile):
        self.log.info("Found stopfile " + stopfile + ".  Removing stopfile and " +
          "exiting script.")
        os.remove(stopfile)
        sys.exit(0)

"""
Prints out the usage for the command line.
"""
def usage():
  usage = ["JobStream.py [-hrjnlsmc]\n"]
  usage.append("            [-e | --execute] runs the jobstream.\n")
  usage.append("            [-h | --help] prints this help and usage message.\n")
  usage.append("            [-r | --resume] to resume a previous run.\n")
  usage.append("            [-l | --logconf] Overrides logging conf file [logging.conf].\n")
  usage.append("            [-j | --jobdir] The job directory, [/d01/jobstream].\n")
  usage.append("            [-n | --nutchdir] The nutch home, [/d01/revolution].\n")
  usage.append("            [-m | --masterdir] The dfs master directory [crawl].\n")
  usage.append("            [-b | --backupdir] The master backup directory, [crawl-backup].\n")
  usage.append("            [-s | --splitsize] The number of urls per load [500000].\n")
  usage.append("            [-f | --fetchmerge] The number of fetches to run before merging [1].\n")
  message = " ".join(usage)
  print message

"""
Main method that starts up the script.
"""
def main(argv):

  # set the default values
  resume = 0
  execute = 0
  checkfile = "jobstream.stop"
  logconf = "logging.conf"
  jobdir = "/d01/jobstream"
  nutchdir = "/d01/revolution"
  masterdir = "crawl"
  backupdir = "crawl-backup"
  dfsdumpdir = "crawldump"
  tempdir = "crawltemp"
  splitsize = 500000
  fetchmerge = 3

  try:

    # process the command line options
    opts, args = getopt.getopt(argv, "hrej:n:l:s:m:b:f:d:t:", ["help","logconf=","jobdir=",
      "resume","nutchdir=","splitsize=","masterdir=","backupdir=","fetchmerge=", "execute",
      "dumpdir=","tempdir="])

    # if no arguments print usage
    if len(argv) == 0:
      usage()
      sys.exit()

    # loop through all of the command line options and set the appropriate
    # values, overriding defaults
    for opt, arg in opts:
      if opt in ("-h", "--help"):
        usage()
        sys.exit()
      elif opt in ("-r", "--resume"):
        resume = 1
      elif opt in ("-e", "--execute"):
        execute = 1
      elif opt in ("-l", "--logconf"):
        logconf = arg
      elif opt in ("-j", "--jobdir"):
        jobdir = arg
      elif opt in ("-n", "--nutchdir"):
        nutchdir = arg
      elif opt in ("-m", "--masterdir"):
        masterdir = arg
      elif opt in ("-b", "--backupdir"):
        backupdir = arg
      elif opt in ("-d", "--dumpdir"):
        dfsdumpdir = arg
      elif opt in ("-t", "--tempdir"):
        tempdir = arg
      elif opt in ("-s", "--splitsize"):
        splitsize = int(arg)
      elif opt in ("-f", "--fetchmerge"):
        fetchmerge = int(arg)

  except getopt.GetoptError:

    # if an error happens print the usage and exit with an error
    usage()
    sys.exit(2)

  # if we are running the jobstream
  if execute:

    # setup the file and folder variables, setup the logging
    logging.config.fileConfig(logconf)
    localdumpdir = jobdir + "/localdump"
    loaddir = jobdir + "/urllists"
    unfetched = localdumpdir + "/unfetched.urls"
    checkpath = jobdir + "/" + checkfile

    # create the job stream object
    jobStream = JobStream(nutchdir, masterdir, backupdir)

    # if we not resuming then dump, parse and create the url loads
    if not resume:
      jobStream.dumpCrawlDb(dfsdumpdir, localdumpdir)
      jobStream.parseCrawlDump(localdumpdir, unfetched)
      jobStream.createUrlLoads(splitsize, unfetched, loaddir)

    # merging or not run the url loads
    jobStream.runFetchMerges(loaddir, checkpath, fetchmerge)

# if we are running the script from the command line, run the main
# method of the JobStream class
if __name__ == "__main__":
  main(sys.argv[1:])

The JobStream Logging.conf File

[formatters]
keys=simple

[handlers]
keys=console

[loggers]
keys=root,engine

[formatter_simple]
format=%(name)s :%(levelname)s :  %(message)s

[handler_console]
class=StreamHandler
args=[]
formatter=simple

[logger_root]
level=INFO
handlers=console

[logger_engine]
level=INFO
qualname=jobstream
propagate=0
handlers=console

Conclusion

If you want to get further into the operations of the script I suggest reading the source. It is well documented with both documentation and code comments. Alternatively you can send me an email at nutch-dev at dragonflymc dot com if you have any questions.

  • No labels