MediaWiki PHP Cross Reference Collaborative Wikis

Source: /maintenance/storage/recompressTracked.php - 811 lines - 21656 bytes - Summary - Text - Print

Description: Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.

   1  <?php
   2  /**
   3   * Moves blobs indexed by trackBlobs.php to a specified list of destination
   4   * clusters, and recompresses them in the process.
   5   *
   6   * This program is free software; you can redistribute it and/or modify
   7   * it under the terms of the GNU General Public License as published by
   8   * the Free Software Foundation; either version 2 of the License, or
   9   * (at your option) any later version.
  10   *
  11   * This program is distributed in the hope that it will be useful,
  12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14   * GNU General Public License for more details.
  15   *
  16   * You should have received a copy of the GNU General Public License along
  17   * with this program; if not, write to the Free Software Foundation, Inc.,
  18   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19   * http://www.gnu.org/copyleft/gpl.html
  20   *
  21   * @file
  22   * @ingroup Maintenance ExternalStorage
  23   */
  24  
  25  $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
  26  require  __DIR__ . '/../commandLine.inc';
  27  
  28  if ( count( $args ) < 1 ) {
  29      echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
  30  Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
  31  
  32  Options:
  33      --procs <procs>         Set the number of child processes (default 1)
  34      --copy-only             Copy only, do not update the text table. Restart without this option to complete.
  35      --debug-log <file>      Log debugging data to the specified file
  36      --info-log <file>       Log progress messages to the specified file
  37      --critical-log <file>   Log error messages to the specified file
  38  ";
  39      exit( 1 );
  40  }
  41  
  42  $job = RecompressTracked::newFromCommandLine( $args, $options );
  43  $job->execute();
  44  
  45  /**
  46   * Maintenance script that moves blobs indexed by trackBlobs.php to a specified
  47   * list of destination clusters, and recompresses them in the process.
  48   *
  49   * @ingroup Maintenance ExternalStorage
  50   */
  51  class RecompressTracked {
  52      public $destClusters;
  53      public $batchSize = 1000;
  54      public $orphanBatchSize = 1000;
  55      public $reportingInterval = 10;
  56      public $numProcs = 1;
  57      public $useDiff, $pageBlobClass, $orphanBlobClass;
  58      public $slavePipes, $slaveProcs, $prevSlaveId;
  59      public $copyOnly = false;
  60      public $isChild = false;
  61      public $slaveId = false;
  62      public $noCount = false;
  63      public $debugLog, $infoLog, $criticalLog;
  64      public $store;
  65  
  66      static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
  67      static $cmdLineOptionMap = array(
  68          'no-count' => 'noCount',
  69          'procs' => 'numProcs',
  70          'copy-only' => 'copyOnly',
  71          'child' => 'isChild',
  72          'slave-id' => 'slaveId',
  73          'debug-log' => 'debugLog',
  74          'info-log' => 'infoLog',
  75          'critical-log' => 'criticalLog',
  76      );
  77  
  78  	static function getOptionsWithArgs() {
  79          return self::$optionsWithArgs;
  80      }
  81  
  82  	static function newFromCommandLine( $args, $options ) {
  83          $jobOptions = array( 'destClusters' => $args );
  84          foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
  85              if ( isset( $options[$cmdOption] ) ) {
  86                  $jobOptions[$classOption] = $options[$cmdOption];
  87              }
  88          }
  89          return new self( $jobOptions );
  90      }
  91  
  92  	function __construct( $options ) {
  93          foreach ( $options as $name => $value ) {
  94              $this->$name = $value;
  95          }
  96          $this->store = new ExternalStoreDB;
  97          if ( !$this->isChild ) {
  98              $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
  99          } elseif ( $this->slaveId !== false ) {
 100              $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
 101          }
 102          $this->useDiff = function_exists( 'xdiff_string_bdiff' );
 103          $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
 104          $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
 105      }
 106  
 107  	function debug( $msg ) {
 108          wfDebug( "$msg\n" );
 109          if ( $this->debugLog ) {
 110              $this->logToFile( $msg, $this->debugLog );
 111          }
 112  
 113      }
 114  
 115  	function info( $msg ) {
 116          echo "$msg\n";
 117          if ( $this->infoLog ) {
 118              $this->logToFile( $msg, $this->infoLog );
 119          }
 120      }
 121  
 122  	function critical( $msg ) {
 123          echo "$msg\n";
 124          if ( $this->criticalLog ) {
 125              $this->logToFile( $msg, $this->criticalLog );
 126          }
 127      }
 128  
 129  	function logToFile( $msg, $file ) {
 130          $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
 131          if ( $this->slaveId !== false ) {
 132              $header .= "({$this->slaveId})";
 133          }
 134          $header .= ' ' . wfWikiID();
 135          wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
 136      }
 137  
 138      /**
 139       * Wait until the selected slave has caught up to the master.
 140       * This allows us to use the slave for things that were committed in a
 141       * previous part of this batch process.
 142       */
 143  	function syncDBs() {
 144          $dbw = wfGetDB( DB_MASTER );
 145          $dbr = wfGetDB( DB_SLAVE );
 146          $pos = $dbw->getMasterPos();
 147          $dbr->masterPosWait( $pos, 100000 );
 148      }
 149  
 150      /**
 151       * Execute parent or child depending on the isChild option
 152       */
 153  	function execute() {
 154          if ( $this->isChild ) {
 155              $this->executeChild();
 156          } else {
 157              $this->executeParent();
 158          }
 159      }
 160  
 161      /**
 162       * Execute the parent process
 163       */
 164  	function executeParent() {
 165          if ( !$this->checkTrackingTable() ) {
 166              return;
 167          }
 168  
 169          $this->syncDBs();
 170          $this->startSlaveProcs();
 171          $this->doAllPages();
 172          $this->doAllOrphans();
 173          $this->killSlaveProcs();
 174      }
 175  
 176      /**
 177       * Make sure the tracking table exists and isn't empty
 178       * @return bool
 179       */
 180  	function checkTrackingTable() {
 181          $dbr = wfGetDB( DB_SLAVE );
 182          if ( !$dbr->tableExists( 'blob_tracking' ) ) {
 183              $this->critical( "Error: blob_tracking table does not exist" );
 184              return false;
 185          }
 186          $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
 187          if ( !$row ) {
 188              $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
 189              return false;
 190          }
 191          return true;
 192      }
 193  
 194      /**
 195       * Start the worker processes.
 196       * These processes will listen on stdin for commands.
 197       * This necessary because text recompression is slow: loading, compressing and
 198       * writing are all slow.
 199       */
 200  	function startSlaveProcs() {
 201          $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
 202          foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
 203              if ( $cmdOption == 'slave-id' ) {
 204                  continue;
 205              } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
 206                  $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
 207              } elseif ( $this->$classOption ) {
 208                  $cmd .= " --$cmdOption";
 209              }
 210          }
 211          $cmd .= ' --child' .
 212              ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
 213              ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
 214  
 215          $this->slavePipes = $this->slaveProcs = array();
 216          for ( $i = 0; $i < $this->numProcs; $i++ ) {
 217              $pipes = false;
 218              $spec = array(
 219                  array( 'pipe', 'r' ),
 220                  array( 'file', 'php://stdout', 'w' ),
 221                  array( 'file', 'php://stderr', 'w' )
 222              );
 223              wfSuppressWarnings();
 224              $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
 225              wfRestoreWarnings();
 226              if ( !$proc ) {
 227                  $this->critical( "Error opening slave process: $cmd" );
 228                  exit( 1 );
 229              }
 230              $this->slaveProcs[$i] = $proc;
 231              $this->slavePipes[$i] = $pipes[0];
 232          }
 233          $this->prevSlaveId = -1;
 234      }
 235  
 236      /**
 237       * Gracefully terminate the child processes
 238       */
 239  	function killSlaveProcs() {
 240          $this->info( "Waiting for slave processes to finish..." );
 241          for ( $i = 0; $i < $this->numProcs; $i++ ) {
 242              $this->dispatchToSlave( $i, 'quit' );
 243          }
 244          for ( $i = 0; $i < $this->numProcs; $i++ ) {
 245              $status = proc_close( $this->slaveProcs[$i] );
 246              if ( $status ) {
 247                  $this->critical( "Warning: child #$i exited with status $status" );
 248              }
 249          }
 250          $this->info( "Done." );
 251      }
 252  
 253      /**
 254       * Dispatch a command to the next available slave.
 255       * This may block until a slave finishes its work and becomes available.
 256       */
 257  	function dispatch( /*...*/ ) {
 258          $args = func_get_args();
 259          $pipes = $this->slavePipes;
 260          $numPipes = stream_select( $x = array(), $pipes, $y = array(), 3600 );
 261          if ( !$numPipes ) {
 262              $this->critical( "Error waiting to write to slaves. Aborting" );
 263              exit( 1 );
 264          }
 265          for ( $i = 0; $i < $this->numProcs; $i++ ) {
 266              $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
 267              if ( isset( $pipes[$slaveId] ) ) {
 268                  $this->prevSlaveId = $slaveId;
 269                  $this->dispatchToSlave( $slaveId, $args );
 270                  return;
 271              }
 272          }
 273          $this->critical( "Unreachable" );
 274          exit( 1 );
 275      }
 276  
 277      /**
 278       * Dispatch a command to a specified slave
 279       */
 280  	function dispatchToSlave( $slaveId, $args ) {
 281          $args = (array)$args;
 282          $cmd = implode( ' ', $args );
 283          fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
 284      }
 285  
 286      /**
 287       * Move all tracked pages to the new clusters
 288       */
 289  	function doAllPages() {
 290          $dbr = wfGetDB( DB_SLAVE );
 291          $i = 0;
 292          $startId = 0;
 293          if ( $this->noCount ) {
 294              $numPages = '[unknown]';
 295          } else {
 296              $numPages = $dbr->selectField( 'blob_tracking',
 297                  'COUNT(DISTINCT bt_page)',
 298                  # A condition is required so that this query uses the index
 299                  array( 'bt_moved' => 0 ),
 300                  __METHOD__
 301              );
 302          }
 303          if ( $this->copyOnly ) {
 304              $this->info( "Copying pages..." );
 305          } else {
 306              $this->info( "Moving pages..." );
 307          }
 308          while ( true ) {
 309              $res = $dbr->select( 'blob_tracking',
 310                  array( 'bt_page' ),
 311                  array(
 312                      'bt_moved' => 0,
 313                      'bt_page > ' . $dbr->addQuotes( $startId )
 314                  ),
 315                  __METHOD__,
 316                  array(
 317                      'DISTINCT',
 318                      'ORDER BY' => 'bt_page',
 319                      'LIMIT' => $this->batchSize,
 320                  )
 321              );
 322              if ( !$res->numRows() ) {
 323                  break;
 324              }
 325              foreach ( $res as $row ) {
 326                  $this->dispatch( 'doPage', $row->bt_page );
 327                  $i++;
 328              }
 329              $startId = $row->bt_page;
 330              $this->report( 'pages', $i, $numPages );
 331          }
 332          $this->report( 'pages', $i, $numPages );
 333          if ( $this->copyOnly ) {
 334              $this->info( "All page copies queued." );
 335          } else {
 336              $this->info( "All page moves queued." );
 337          }
 338      }
 339  
 340      /**
 341       * Display a progress report
 342       */
 343  	function report( $label, $current, $end ) {
 344          $this->numBatches++;
 345          if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
 346              $this->numBatches = 0;
 347              $this->info( "$label: $current / $end" );
 348              $this->waitForSlaves();
 349          }
 350      }
 351  
 352      /**
 353       * Move all orphan text to the new clusters
 354       */
 355  	function doAllOrphans() {
 356          $dbr = wfGetDB( DB_SLAVE );
 357          $startId = 0;
 358          $i = 0;
 359          if ( $this->noCount ) {
 360              $numOrphans = '[unknown]';
 361          } else {
 362              $numOrphans = $dbr->selectField( 'blob_tracking',
 363                  'COUNT(DISTINCT bt_text_id)',
 364                  array( 'bt_moved' => 0, 'bt_page' => 0 ),
 365                  __METHOD__ );
 366              if ( !$numOrphans ) {
 367                  return;
 368              }
 369          }
 370          if ( $this->copyOnly ) {
 371              $this->info( "Copying orphans..." );
 372          } else {
 373              $this->info( "Moving orphans..." );
 374          }
 375  
 376          while ( true ) {
 377              $res = $dbr->select( 'blob_tracking',
 378                  array( 'bt_text_id' ),
 379                  array(
 380                      'bt_moved' => 0,
 381                      'bt_page' => 0,
 382                      'bt_text_id > ' . $dbr->addQuotes( $startId )
 383                  ),
 384                  __METHOD__,
 385                  array(
 386                      'DISTINCT',
 387                      'ORDER BY' => 'bt_text_id',
 388                      'LIMIT' => $this->batchSize
 389                  )
 390              );
 391              if ( !$res->numRows() ) {
 392                  break;
 393              }
 394              $ids = array();
 395              foreach ( $res as $row ) {
 396                  $ids[] = $row->bt_text_id;
 397                  $i++;
 398              }
 399              // Need to send enough orphan IDs to the child at a time to fill a blob,
 400              // so orphanBatchSize needs to be at least ~100.
 401              // batchSize can be smaller or larger.
 402              while ( count( $ids ) > $this->orphanBatchSize ) {
 403                  $args = array_slice( $ids, 0, $this->orphanBatchSize );
 404                  $ids = array_slice( $ids, $this->orphanBatchSize );
 405                  array_unshift( $args, 'doOrphanList' );
 406                  call_user_func_array( array( $this, 'dispatch' ), $args );
 407              }
 408              if ( count( $ids ) ) {
 409                  $args = $ids;
 410                  array_unshift( $args, 'doOrphanList' );
 411                  call_user_func_array( array( $this, 'dispatch' ), $args );
 412              }
 413  
 414              $startId = $row->bt_text_id;
 415              $this->report( 'orphans', $i, $numOrphans );
 416          }
 417          $this->report( 'orphans', $i, $numOrphans );
 418          $this->info( "All orphans queued." );
 419      }
 420  
 421      /**
 422       * Main entry point for worker processes
 423       */
 424  	function executeChild() {
 425          $this->debug( 'starting' );
 426          $this->syncDBs();
 427  
 428          while ( !feof( STDIN ) ) {
 429              $line = rtrim( fgets( STDIN ) );
 430              if ( $line == '' ) {
 431                  continue;
 432              }
 433              $this->debug( $line );
 434              $args = explode( ' ', $line );
 435              $cmd = array_shift( $args );
 436              switch ( $cmd ) {
 437              case 'doPage':
 438                  $this->doPage( intval( $args[0] ) );
 439                  break;
 440              case 'doOrphanList':
 441                  $this->doOrphanList( array_map( 'intval', $args ) );
 442                  break;
 443              case 'quit':
 444                  return;
 445              }
 446              $this->waitForSlaves();
 447          }
 448      }
 449  
 450      /**
 451       * Move tracked text in a given page
 452       */
 453  	function doPage( $pageId ) {
 454          $title = Title::newFromId( $pageId );
 455          if ( $title ) {
 456              $titleText = $title->getPrefixedText();
 457          } else {
 458              $titleText = '[deleted]';
 459          }
 460          $dbr = wfGetDB( DB_SLAVE );
 461  
 462          // Finish any incomplete transactions
 463          if ( !$this->copyOnly ) {
 464              $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
 465              $this->syncDBs();
 466          }
 467  
 468          $startId = 0;
 469          $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
 470  
 471          while ( true ) {
 472              $res = $dbr->select(
 473                  array( 'blob_tracking', 'text' ),
 474                  '*',
 475                  array(
 476                      'bt_page' => $pageId,
 477                      'bt_text_id > ' . $dbr->addQuotes( $startId ),
 478                      'bt_moved' => 0,
 479                      'bt_new_url IS NULL',
 480                      'bt_text_id=old_id',
 481                  ),
 482                  __METHOD__,
 483                  array(
 484                      'ORDER BY' => 'bt_text_id',
 485                      'LIMIT' => $this->batchSize
 486                  )
 487              );
 488              if ( !$res->numRows() ) {
 489                  break;
 490              }
 491  
 492              $lastTextId = 0;
 493              foreach ( $res as $row ) {
 494                  if ( $lastTextId == $row->bt_text_id ) {
 495                      // Duplicate (null edit)
 496                      continue;
 497                  }
 498                  $lastTextId = $row->bt_text_id;
 499                  // Load the text
 500                  $text = Revision::getRevisionText( $row );
 501                  if ( $text === false ) {
 502                      $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
 503                      continue;
 504                  }
 505  
 506                  // Queue it
 507                  if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
 508                      $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
 509                      $trx->commit();
 510                      $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
 511                      $this->waitForSlaves();
 512                  }
 513              }
 514              $startId = $row->bt_text_id;
 515          }
 516  
 517          $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
 518          $trx->commit();
 519      }
 520  
 521      /**
 522       * Atomic move operation.
 523       *
 524       * Write the new URL to the text table and set the bt_moved flag.
 525       *
 526       * This is done in a single transaction to provide restartable behavior
 527       * without data loss.
 528       *
 529       * The transaction is kept short to reduce locking.
 530       */
 531  	function moveTextRow( $textId, $url ) {
 532          if ( $this->copyOnly ) {
 533              $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
 534              exit( 1 );
 535          }
 536          $dbw = wfGetDB( DB_MASTER );
 537          $dbw->begin( __METHOD__ );
 538          $dbw->update( 'text',
 539              array( // set
 540                  'old_text' => $url,
 541                  'old_flags' => 'external,utf-8',
 542              ),
 543              array( // where
 544                  'old_id' => $textId
 545              ),
 546              __METHOD__
 547          );
 548          $dbw->update( 'blob_tracking',
 549              array( 'bt_moved' => 1 ),
 550              array( 'bt_text_id' => $textId ),
 551              __METHOD__
 552          );
 553          $dbw->commit( __METHOD__ );
 554      }
 555  
 556      /**
 557       * Moves are done in two phases: bt_new_url and then bt_moved.
 558       *  - bt_new_url indicates that the text has been copied to the new cluster.
 559       *  - bt_moved indicates that the text table has been updated.
 560       *
 561       * This function completes any moves that only have done bt_new_url. This
 562       * can happen when the script is interrupted, or when --copy-only is used.
 563       */
 564  	function finishIncompleteMoves( $conds ) {
 565          $dbr = wfGetDB( DB_SLAVE );
 566  
 567          $startId = 0;
 568          $conds = array_merge( $conds, array(
 569              'bt_moved' => 0,
 570              'bt_new_url IS NOT NULL'
 571          ) );
 572          while ( true ) {
 573              $res = $dbr->select( 'blob_tracking',
 574                  '*',
 575                  array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
 576                  __METHOD__,
 577                  array(
 578                      'ORDER BY' => 'bt_text_id',
 579                      'LIMIT' => $this->batchSize,
 580                  )
 581              );
 582              if ( !$res->numRows() ) {
 583                  break;
 584              }
 585              $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
 586              foreach ( $res as $row ) {
 587                  $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
 588                  if ( $row->bt_text_id % 10 == 0 ) {
 589                      $this->waitForSlaves();
 590                  }
 591              }
 592              $startId = $row->bt_text_id;
 593          }
 594      }
 595  
 596      /**
 597       * Returns the name of the next target cluster
 598       * @return string
 599       */
 600  	function getTargetCluster() {
 601          $cluster = next( $this->destClusters );
 602          if ( $cluster === false ) {
 603              $cluster = reset( $this->destClusters );
 604          }
 605          return $cluster;
 606      }
 607  
 608      /**
 609       * Gets a DB master connection for the given external cluster name
 610       * @param $cluster string
 611       * @return DatabaseBase
 612       */
 613  	function getExtDB( $cluster ) {
 614          $lb = wfGetLBFactory()->getExternalLB( $cluster );
 615          return $lb->getConnection( DB_MASTER );
 616      }
 617  
 618      /**
 619       * Move an orphan text_id to the new cluster
 620       */
 621  	function doOrphanList( $textIds ) {
 622          // Finish incomplete moves
 623          if ( !$this->copyOnly ) {
 624              $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
 625              $this->syncDBs();
 626          }
 627  
 628          $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
 629  
 630          $res = wfGetDB( DB_SLAVE )->select(
 631              array( 'text', 'blob_tracking' ),
 632              array( 'old_id', 'old_text', 'old_flags' ),
 633              array(
 634                  'old_id' => $textIds,
 635                  'bt_text_id=old_id',
 636                  'bt_moved' => 0,
 637              ),
 638              __METHOD__,
 639              array( 'DISTINCT' )
 640          );
 641  
 642          foreach ( $res as $row ) {
 643              $text = Revision::getRevisionText( $row );
 644              if ( $text === false ) {
 645                  $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" );
 646                  continue;
 647              }
 648  
 649              if ( !$trx->addItem( $text, $row->old_id ) ) {
 650                  $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
 651                  $trx->commit();
 652                  $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
 653                  $this->waitForSlaves();
 654              }
 655          }
 656          $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
 657          $trx->commit();
 658      }
 659  
 660      /**
 661       * Wait for slaves (quietly)
 662       */
 663  	function waitForSlaves() {
 664          $lb = wfGetLB();
 665          while ( true ) {
 666              list( $host, $maxLag ) = $lb->getMaxLag();
 667              if ( $maxLag < 2 ) {
 668                  break;
 669              }
 670              sleep( 5 );
 671          }
 672      }
 673  }
 674  
 675  /**
 676   * Class to represent a recompression operation for a single CGZ blob
 677   */
 678  class CgzCopyTransaction {
 679      public $parent;
 680      public $blobClass;
 681      public $cgz;
 682      public $referrers;
 683  
 684      /**
 685       * Create a transaction from a RecompressTracked object
 686       */
 687  	function __construct( $parent, $blobClass ) {
 688          $this->blobClass = $blobClass;
 689          $this->cgz = false;
 690          $this->texts = array();
 691          $this->parent = $parent;
 692      }
 693  
 694      /**
 695       * Add text.
 696       * Returns false if it's ready to commit.
 697       * @param $text string
 698       * @param $textId
 699       * @return bool
 700       */
 701  	function addItem( $text, $textId ) {
 702          if ( !$this->cgz ) {
 703              $class = $this->blobClass;
 704              $this->cgz = new $class;
 705          }
 706          $hash = $this->cgz->addItem( $text );
 707          $this->referrers[$textId] = $hash;
 708          $this->texts[$textId] = $text;
 709          return $this->cgz->isHappy();
 710      }
 711  
 712  	function getSize() {
 713          return count( $this->texts );
 714      }
 715  
 716      /**
 717       * Recompress text after some aberrant modification
 718       */
 719  	function recompress() {
 720          $class = $this->blobClass;
 721          $this->cgz = new $class;
 722          $this->referrers = array();
 723          foreach ( $this->texts as $textId => $text ) {
 724              $hash = $this->cgz->addItem( $text );
 725              $this->referrers[$textId] = $hash;
 726          }
 727      }
 728  
 729      /**
 730       * Commit the blob.
 731       * Does nothing if no text items have been added.
 732       * May skip the move if --copy-only is set.
 733       */
 734  	function commit() {
 735          $originalCount = count( $this->texts );
 736          if ( !$originalCount ) {
 737              return;
 738          }
 739  
 740          // Check to see if the target text_ids have been moved already.
 741          //
 742          // We originally read from the slave, so this can happen when a single
 743          // text_id is shared between multiple pages. It's rare, but possible
 744          // if a delete/move/undelete cycle splits up a null edit.
 745          //
 746          // We do a locking read to prevent closer-run race conditions.
 747          $dbw = wfGetDB( DB_MASTER );
 748          $dbw->begin( __METHOD__ );
 749          $res = $dbw->select( 'blob_tracking',
 750              array( 'bt_text_id', 'bt_moved' ),
 751              array( 'bt_text_id' => array_keys( $this->referrers ) ),
 752              __METHOD__, array( 'FOR UPDATE' ) );
 753          $dirty = false;
 754          foreach ( $res as $row ) {
 755              if ( $row->bt_moved ) {
 756                  # This row has already been moved, remove it
 757                  $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
 758                  unset( $this->texts[$row->bt_text_id] );
 759                  $dirty = true;
 760              }
 761          }
 762  
 763          // Recompress the blob if necessary
 764          if ( $dirty ) {
 765              if ( !count( $this->texts ) ) {
 766                  // All have been moved already
 767                  if ( $originalCount > 1 ) {
 768                      // This is suspcious, make noise
 769                      $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
 770                          "processes running, doing the same job?" );
 771                  }
 772                  return;
 773              }
 774              $this->recompress();
 775          }
 776  
 777          // Insert the data into the destination cluster
 778          $targetCluster = $this->parent->getTargetCluster();
 779          $store = $this->parent->store;
 780          $targetDB = $store->getMaster( $targetCluster );
 781          $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
 782          $targetDB->begin( __METHOD__ );
 783          $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
 784  
 785          // Write the new URLs to the blob_tracking table
 786          foreach ( $this->referrers as $textId => $hash ) {
 787              $url = $baseUrl . '/' . $hash;
 788              $dbw->update( 'blob_tracking',
 789                  array( 'bt_new_url' => $url ),
 790                  array(
 791                      'bt_text_id' => $textId,
 792                      'bt_moved' => 0, # Check for concurrent conflicting update
 793                  ),
 794                  __METHOD__
 795              );
 796          }
 797  
 798          $targetDB->commit( __METHOD__ );
 799          // Critical section here: interruption at this point causes blob duplication
 800          // Reversing the order of the commits would cause data loss instead
 801          $dbw->commit( __METHOD__ );
 802  
 803          // Write the new URLs to the text table and set the moved flag
 804          if ( !$this->parent->copyOnly ) {
 805              foreach ( $this->referrers as $textId => $hash ) {
 806                  $url = $baseUrl . '/' . $hash;
 807                  $this->parent->moveTextRow( $textId, $url );
 808              }
 809          }
 810      }
 811  }

title

Description

title

Description

title

Description

title

title

Body