#!/usr/bin/env perl #use Smart::Comments; use strict; use warnings; use FindBin; use lib "$FindBin::Bin/../lib"; use WWW::OpenResty::Simple; use WWW::Prefetcher; use Getopt::Long; use WWW::Curl::Easy; use Cache::SizeAwareMemoryCache; #use Digest::MD5 qw(md5_hex); use Memcached::libmemcached qw( memcached_create memcached_server_add memcached_add memcached_free memcached_decrement ); sub log_it(@); sub pick_proxy(); my $fetch_task_key = "fetch_tasks_by_openresty_key"; my $fetch_task_cycle = 1; my $fetch_task_limit = 2; GetOptions( 'help|h' => \(my $help), 'count|c=i' => \(my $task_count), 'timeout|t=i' => \(my $timeout), 'server|s=s' => \(my $server), 'user|u=s' => \(my $user), 'password|p=s' => \(my $password), 'crawl-cycle=i' => \(my $crawl_cycle), 'fetch-task-cycle=i' => \($fetch_task_cycle), 'fetch-task-limit=i' => \($fetch_task_limit), 'user-agent|a=s' => \(my $user_agent), 'memc=s' => \(my $memc_servers)) or help(1); if ($help) { help(0) } sub help { my $exit_code = shift; warn <<"_EOC_"; Usage: $0 Options: --count|-c Size of every task bulk. --timeout|-t crawling HTTP request timeout. --server|-s OpenResty server hostname. --user|-u OpenRsety user name. --password|-p OpenRsety account's password. --crawl-cycle Crawlling cycle in seconds for every host. --fetch-task-cycle Fetching tasks cycle in seconds (default: 1) --fetch-task-limit Fetching tasks limit (default: 2) --memc Memcached server list for crawlling frequency control. --user-agent|-a Specify the User-Agent used. _EOC_ exit($exit_code); } my $timeoutCSS = new Cache::SizeAwareMemoryCache( { 'namespace' => 'prefetcher', 'default_expires_in' => 86400, 'max_size' => 10000 } ); my $current_page_id; my ($lower_id, $upper_id); my (%processed_ids, @ok_ids, @nok_ids); $task_count or die "No --count specified.\n"; $server or die "No --server specified.\n"; $user or die "No --user specified.\n"; $password or die "No --password specified.\n"; $timeout or die "No --timeout specified.\n"; $crawl_cycle or die "No --crawl-cycle specified.\n"; $memc_servers or die "No --memc specified.\n"; my $memc = memcached_create(); END { memcached_free($memc); } my @servers = split /\s*,\s*|\s+/, $memc_servers; for my $server (@servers) { my ($host, $port) = split /:/, $server; $port ||= 11211; memcached_server_add($memc, $host, $port); } my ($tasks, $proxy_list, $cur_proxy_id); my $resty = WWW::OpenResty::Simple->new( { server => $server, user => $user, password => $password } ); safe_login(); my $prefetcher = WWW::Prefetcher->new({ timeout => $timeout, user_agent => $user_agent, before => sub { my $rurl = shift; if ( $timeoutCSS->{$$rurl} ) { return 0; } if (! $processed_ids{$current_page_id} ) { if (substr($$rurl, -4) eq '.xml') { # we reject apparent XML docs log_it "[info] Marked the XML page $$rurl as E."; push @nok_ids, $current_page_id; return 0; } } return 1; }, on_success => sub { my ($rurl, $curl) = @_; my $status_code = $curl->getinfo(CURLINFO_HTTP_CODE); my $size_download = $curl->getinfo(CURLINFO_SIZE_DOWNLOAD); #log_it "Status is ". $status_code; if ( ! $processed_ids{$current_page_id} ) { # ensure this is not a CSS or something if ($status_code < 400) { if ($curl->getinfo(CURLINFO_CONTENT_TYPE) =~ /html/i) { push @ok_ids, $current_page_id; } else { log_it "[info] Page $$rurl has a bad content-type: " . $curl->getinfo(CURLINFO_CONTENT_TYPE) . " (marking as E)"; push @nok_ids, $current_page_id; } } else { log_it "[info] Page $$rurl has bad status code: $status_code (marking as E)"; push @nok_ids, $current_page_id; } } $processed_ids{$current_page_id} = 1; }, on_error => sub { my ($rurl, $curl, $retcode) = @_; log_it "[error] Failed to process $$rurl: ". $curl->strerror($retcode). " ($retcode)"; if ($retcode !~ /^(?:2|5|27|26|81)$/) { if ( $$rurl =~ /\.css$/i) { # XXX well...this hash should really be called badCSS # rather than timeoutCSS :P $timeoutCSS->set($$rurl, 1); } elsif (! $processed_ids{$current_page_id} ) { log_it "[info] Marked the flawed page as E (curl returns error code $retcode)"; push @nok_ids, $current_page_id; } } $processed_ids{$current_page_id} = 1; }, }); log_it "[info] Prefetcher started. (pid: $$)"; load_proxies(); my $begin = time; while (1) { if (time - $begin > 30 * 60) { # more than 30 min load_proxies(); $begin = time; } @ok_ids = (); @nok_ids = (); %processed_ids = (); # fetch tasks frequency control. my $fetch_times = 0; my $dec_ret = memcached_decrement($memc, $fetch_task_key, 1, $fetch_times); if (!$dec_ret) { my $add_ret = memcached_add($memc, $fetch_task_key, $fetch_task_limit, $fetch_task_cycle); if (!$add_ret) { sleep 1; } } else { if ($fetch_times == 0 ) { log_it("[info] fetching tasks refused."); sleep 1; next; } } fetch_tasks(); my $proxy_ind = pick_proxy(); my $proxy = $proxy_list->[$proxy_ind]; my $proxy_str = $proxy->{hostname} . ':' . $proxy->{port}; $prefetcher->http_proxy($proxy_str); ### $proxy_ind ### $proxy_str log_it "[info] Doing tasks using proxy " . $proxy->{id} . " ($proxy_str)..."; my $skipped_tasks = process_tasks($tasks); if (my $count = @$skipped_tasks) { log_it("[info] $count tasks skipped due to global crawlling locking."); # try to re-do skipped tasks prevented by the global crawlling lock: #$skipped_tasks = process_tasks($skipped_tasks); #if ($count = @$skipped_tasks) { #log_it("[info] Still $count tasks skipped after the second trial."); #} } $lower_id = $tasks->[0]->{id}; $upper_id = $tasks->[-1]->{id}; log_it "[info] Submitting tasks..."; submit_tasks($proxy->{id}); } sub process_tasks { my $tasks = shift; my $i = 0; my @skipped_tasks; for my $task (@$tasks) { ### $task $current_page_id = $task->{id}; if ($task->{url} =~ m{^https?://([^/:])\S*$}) { my $host = $1; #my $host_key = md5_hex($host); my $rc = memcached_add($memc, $host, 1, $crawl_cycle); if ($rc) { eval { $prefetcher->process(\($task->{url}), 'html'); }; if ($@) { log_it "[warn] Unexpected exception thrown: $@" } log_it "[info] Completed 10 pages." if ($i + 1) % 10 == 0; } else { if ($memc->errstr !~ /NOT STORED/) { log_it "[warn] Failed to access global host crawlling lock for host $host: " . $memc->errstr; } push @skipped_tasks, $task; } } else { log_it "[warn] Invalid protocol in URL: " . $task->{url} . " (marking as E)"; push @nok_ids, $current_page_id; } } continue { $i++ } return \@skipped_tasks; } sub load_proxies { my $timeout = 15; # sec while (1) { eval { $proxy_list = $resty->get('/=/model/HttpProxy/~/~'); }; if ($@) { log_it $@; } else { if (!@$proxy_list) { log_it "[warn] No proxy found on the OpenResty server.\n"; next; } last; } } continue { if ($timeout < 180) { # 3 min sleep $timeout; $timeout *= 4; } else { sleep 180; } } my $total_size = 0; my @size; for my $proxy (@$proxy_list) { ### $proxy my $size = $proxy->{size}; $total_size += $size; push @size, $size; } ### $total_size; *pick_proxy = sub () { my $val = (int rand $total_size) + 1; my $acc = 0; my $i = 0; for my $size (@size) { $acc += $size; ### $val ### $acc if ($val <= $acc) { # hit! return $i; } } continue { $i++ } log_it "Rand assumption broke"; exit 1; }; } sub safe_login { my $timeout = 15; # sec while (1) { eval { $resty->login($user, $password); }; if ($@) { log_it "[error] $@"; } else { log_it "[info] OpenResty login successful."; last; } } continue { if ($timeout < 180) { # 3 min sleep $timeout; $timeout *= 4; } else { sleep 180; } } } sub fetch_tasks { my $timeout = 15; # sec while (1) { eval { $tasks = $resty->get("/=/view/PrefetcherGetTasks/count/$task_count"); }; if ($@) { if ($@ =~ /Login required/i) { log_it "[info] Trying to re-login to OpenResty..."; safe_login(); $timeout = 1; } else { log_it "[error] $@"; } } else { if (!@$tasks) { log_it "[warn] No task found in OpenResty."; } else { log_it "[info] Got " . scalar(@$tasks) . " tasks."; last; } } } continue { if ($timeout < 180) { # 3 min sleep $timeout; $timeout *= 4; } else { sleep 180; } } } sub log_it (@) { my $now = localtime; my $s = shift; my ($sec, $min, $hour, $mday, $mon, $year) = localtime; $year += 1900; $mon += 1; $s =~ s/\n+//gs; chomp($s); warn sprintf("[%04d-%02d-%02d %02d:%02d:%02d] %s\n", $year, $mon, $mday, $hour, $min, $sec, $s); } sub list2pgarray { my $list = shift; '{' . join(',', @$list) . '}' } sub submit_tasks { my ($proxy_id) = @_; if (!@ok_ids && !@nok_ids) { log_it "[warn] No tasks to be submitted."; return; } my $timeout = 15; # sec for (1..3) { # here we only try 3 times... my $res; eval { $res = $resty->get( "/=/view/PrefetcherSubmit/~/~", { ok_ids => list2pgarray(\@ok_ids), nok_ids => list2pgarray(\@nok_ids), proxy => $proxy_id, } ); }; if ($@) { if ($@ =~ /Login required/i) { log_it "[info] Trying to re-login to OpenResty..."; safe_login(); $timeout = 1; } else { log_it "[error] $@"; } } else { eval { log_it "[info] Submitted tasks (" . scalar(@ok_ids) . " P, ". scalar(@nok_ids) . " E, ". "$res->[0]->{affected} affected)."; }; return; # done! } } continue { if ($timeout < 180) { # 3 min sleep $timeout; $timeout *= 4; } else { sleep 180; } } log_it "[error] Failed to submit tasks with ids $lower_id..$upper_id."; }