File "Queue.php"

Full Path: /home/romayxjt/public_html/wp-content/plugins/the-events-calendar/src/Tribe/Aggregator/Record/Queue.php
File size: 11.48 KB
MIME-type: text/x-php
Charset: utf-8

<?php
// Don't load directly
defined( 'WPINC' ) or die;

class Tribe__Events__Aggregator__Record__Queue implements Tribe__Events__Aggregator__Record__Queue_Interface {
	public static $in_progress_key = 'tribe_aggregator_queue_';
	public static $queue_key = 'queue';
	public static $activity_key = 'activity';

	/**
	 * @var Tribe__Events__Aggregator__Record__Abstract
	 */
	public $record;

	public $is_fetching = false;
	protected $importer;

	/**
	 * @var Tribe__Events__Aggregator__Record__Activity
	 */
	protected $activity;

	/**
	 * Holds the Items that will be processed
	 *
	 * @var array
	 */
	public $items = [];

	/**
	 * Holds the Items that will be processed next
	 *
	 * @var array
	 */
	public $next = [];

	/**
	 * How many items are going to be processed
	 *
	 * @var int
	 */
	public $total = 0;

	/**
	 * @var Tribe__Events__Aggregator__Record__Queue_Cleaner
	 */
	protected $cleaner;

	/**
	 * Whether any real processing should happen for the queue or not.
	 *
	 * @var bool
	 */
	protected $null_process = false;

	/**
	 * @var bool Whether this queue instance has acquired the lock or not.
	 */
	protected $has_lock = false;

	/**
	 * Tribe__Events__Aggregator__Record__Queue constructor.
	 *
	 * @param int|Tribe__Events__Aggregator__Record__Abstract       $record
	 * @param array                                                 $items
	 * @param Tribe__Events__Aggregator__Record__Queue_Cleaner|null $cleaner
	 */
	public function __construct( $record, $items = [], Tribe__Events__Aggregator__Record__Queue_Cleaner $cleaner = null ) {
		tribe( 'chunker' );

		if ( is_numeric( $record ) ) {
			$record = Tribe__Events__Aggregator__Records::instance()->get_by_post_id( $record );
		}

		if ( ! is_object( $record ) || ! $record instanceof \Tribe__Events__Aggregator__Record__Abstract ) {
			$this->null_process = true;

			return;
		}

		if ( is_wp_error( $items ) ) {
			$this->null_process = true;

			return;
		}

		$this->cleaner = $cleaner ? $cleaner : new Tribe__Events__Aggregator__Record__Queue_Cleaner();

		$this->cleaner->remove_duplicate_pending_records_for( $record );

		$failed = $this->cleaner->maybe_fail_stalled_record( $record );

		if ( $failed ) {
			$this->null_process = true;

			return;
		}

		$this->record = $record;
		$this->activity();

		if ( ! empty( $items ) ) {
			if ( 'fetch' === $items ) {
				$this->is_fetching = true;
				$this->items       = 'fetch';
			} else {
				$this->init_queue( $items );
			}

			$this->save();
		} else {
			$this->load_queue();
		}
		$this->cleaner = $cleaner;
	}

	public function __get( $key ) {
		switch ( $key ) {
			case 'activity':
				return $this->activity();
				break;
		}
	}

	protected function init_queue( $items ) {
		if ( 'csv' === $this->record->origin ) {
			$this->record->reset_tracking_options();
			$this->importer = $items;
			$this->total    = $this->importer->get_line_count();
			$this->items    = array_fill( 0, $this->total, true );
		} else {
			$this->items = $items;

			// Count the Total of items now and stores as the total
			$this->total = count( $this->items );
		}
	}

	protected function load_queue() {
		if ( empty( $this->record->meta[ self::$queue_key ] ) ) {
			$this->is_fetching = false;
			$this->items       = [];
		} else {
			$this->items = $this->record->meta[ self::$queue_key ];
		}

		if ( 'fetch' === $this->items ) {
			$this->is_fetching = true;
		}
	}

	public function activity() {
		if ( empty( $this->activity ) ) {
			if (
				empty( $this->record->meta[ self::$activity_key ] )
				|| ! $this->record->meta[ self::$activity_key ] instanceof Tribe__Events__Aggregator__Record__Activity
			) {
				$this->activity = new Tribe__Events__Aggregator__Record__Activity;
			} else {
				$this->activity = $this->record->meta[ self::$activity_key ];
			}
		}

		return $this->activity;
	}

	/**
	 * Allows us to check if the Events Data has still pending
	 *
	 * @return boolean
	 */
	public function is_fetching() {
		return $this->is_fetching;
	}

	/**
	 * Shortcut to check how many items are going to be processed next
	 *
	 * @return int
	 */
	public function count() {
		return is_array( $this->items ) ? count( $this->items ) : 0;
	}

	/**
	 * Shortcut to check if this queue is empty or it has a null process.
	 *
	 * @return boolean `true` if this queue instance has acquired the lock and
	 *                 the count is 0, `false` otherwise.
	 */
	public function is_empty() {
		if ( $this->null_process ) {
			return true;
		}

		return $this->has_lock && 0 === $this->count();
	}

	/**
	 * Gets the queue's total
	 *
	 * @return int
	 */
	protected function get_total() {
		return $this->count() + $this->activity->count( $this->get_queue_type() );
	}

	/**
	 * Saves queue data to relevant meta keys on the post
	 *
	 * @return self
	 */
	protected function save() {
		$this->record->update_meta( self::$activity_key, $this->activity );

		/** @var Tribe__Meta__Chunker $chunker */
		$chunker = tribe( 'chunker' );
		// this data has the potential to be very big, so we register it for possible chunking in the db
		$key = Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . self::$queue_key;
		$chunker->register_chunking_for( $this->record->post->ID, $key );

		if ( empty( $this->items ) ) {
			$this->record->delete_meta( self::$queue_key );
		} else {
			$this->record->update_meta( self::$queue_key, $this->items );
		}

		// If we have a parent also update that
		if ( ! empty( $this->record->post->post_parent ) ) {
			$parent = Tribe__Events__Aggregator__Records::instance()->get_by_post_id( $this->record->post->post_parent );

			if ( ! tribe_is_error( $parent ) && isset( $parent->meta[ self::$activity_key ] ) ) {
				$activity = $parent->meta[ self::$activity_key ];

				if ( $activity instanceof Tribe__Events__Aggregator__Record__Activity ) {
					$parent->update_meta( self::$activity_key, $activity->merge( $this->activity ) );
				}
			}
		}

		// Updates the Modified time for the Record Log
		$args = [
			'ID'            => $this->record->post->ID,
			'post_modified' => date( Tribe__Date_Utils::DBDATETIMEFORMAT, current_time( 'timestamp' ) ),
		];

		if ( empty( $this->items ) ) {
			$args['post_status'] = Tribe__Events__Aggregator__Records::$status->success;
		}

		wp_update_post( $args );

		$this->release_lock();

		return $this;
	}

	/**
	 * Processes a batch for the queue
	 *
	 * @return self|Tribe__Events__Aggregator__Record__Activity
	 */
	public function process( $batch_size = null ) {
		if ( $this->null_process ) {
			return $this;
		}

		$this->has_lock = $this->acquire_lock();

		if ( $this->has_lock ) {
			if ( $this->is_fetching() ) {
				if ( $this->record->should_queue_import() ) {
					$response = $this->record->queue_import();

					if ( $response instanceof WP_Error ) {
						// the import queueing generated an error
						$this->record->set_status_as_failed( $response );

						return $this;
					}

					if ( is_numeric( $response ) ) {
						// the import queueing was rescheduled
						$this->record->set_status_as_pending();

						return $this;
					}
				}

				$data = $this->record->prep_import_data();

				if (
					'fetch' === $data
					|| ! is_array( $data )
					|| is_wp_error( $data )
				) {
					$this->release_lock();
					$this->is_fetching = false;

					return $this->activity();
				}

				$this->init_queue( $data );
				$this->save();
			}


			if ( ! $batch_size ) {
				$batch_size = apply_filters( 'tribe_aggregator_batch_size', Tribe__Events__Aggregator__Record__Queue_Processor::$batch_size );
			}

			/*
			 * If the queue system is switched mid-imports this might happen.
			 * In that case we conservatively stop (kill) the queue process.
			 */
			if ( ! is_array( $this->items ) ) {
				$this->kill_queue();

				return $this;
			}

			// Every time we are about to process we reset the next var
			$this->next = array_splice( $this->items, 0, $batch_size );

			if ( 'csv' === $this->record->origin ) {
				$activity = $this->record->continue_import();
			} else {
				$activity = $this->record->insert_posts( $this->next );
			}

			$this->activity = $this->activity()->merge( $activity );
		} else {
			// this queue instance should not register any new activity
			$this->activity = $this->activity();
		}

		return $this->save();
	}

	/**
	 * Returns the total progress made on processing the queue so far as a percentage.
	 *
	 * @return int
	 */
	public function progress_percentage() {
		if ( 0 === $this->count() ) {
			return 0;
		}

		$total     = $this->get_total();
		$processed = $total - $this->count();
		$percent   = ( $processed / $total ) * 100;
		return (int) $percent;
	}

	/**
	 * Sets a flag to indicate that update work is in progress for a specific event:
	 * this can be useful to prevent collisions between cron-based updated and realtime
	 * updates.
	 *
	 * The flag naturally expires after an hour to allow for recovery if for instance
	 * execution hangs half way through the processing of a batch.
	 */
	public function set_in_progress_flag() {
		if ( empty( $this->record->id ) ) {
			return;
		}

		Tribe__Post_Transient::instance()->set( $this->record->id, self::$in_progress_key, true, HOUR_IN_SECONDS );
	}

	/**
	 * Clears the in progress flag.
	 */
	public function clear_in_progress_flag() {
		if ( empty( $this->record->id ) ) {
			return;
		}

		Tribe__Post_Transient::instance()->delete( $this->record->id, self::$in_progress_key );
	}

	/**
	 * Indicates if the queue for the current event is actively being processed.
	 *
	 * @return bool
	 */
	public function is_in_progress() {
		if ( empty( $this->record->id ) ) {
			return false;
		}

		Tribe__Post_Transient::instance()->get( $this->record->id, self::$in_progress_key );
	}

	/**
	 * Returns the primary post type the queue is processing
	 *
	 * @return string
	 */
	public function get_queue_type() {
		$item_type = Tribe__Events__Main::POSTTYPE;

		if ( ! empty( $this->record->origin ) && 'csv' === $this->record->origin ) {
			$item_type = $this->record->meta['content_type'];
		}

		return $item_type;
	}

	/**
	 * Acquires the global (db stored) queue lock if available.
	 *
	 * @since 4.5.12
	 *
	 * @return bool Whether the lock could be acquired or not if another instance/process has
	 *              already acquired the lock.
	 */
	protected function acquire_lock() {
		if ( empty( $this->record->post->ID ) ) {
			return false;
		}

		$post_id = $this->record->post->ID;

		$post_transient = Tribe__Post_Transient::instance();

		$locked = $post_transient->get( $post_id, 'aggregator_queue_lock' );

		if ( ! empty( $locked ) ) {
			return false;
		}

		$post_transient->set( $post_id, 'aggregator_queue_lock', '1', 180 );

		return true;
	}

	/**
	 * Release the queue lock if this instance of the queue holds it.
	 *
	 * @since 4.5.12
	 *
	 * @return bool
	 */
	protected function release_lock() {
		if ( empty( $this->record->post->ID ) || ! $this->has_lock ) {
			return false;
		}

		$post_id = $this->record->post->ID;

		$post_transient = Tribe__Post_Transient::instance();

		$post_transient->delete( $post_id, 'aggregator_queue_lock' );

		return true;
	}

	/**
	 * Whether the current queue process is stuck or not.
	 *
	 * @since 4.6.21
	 *
	 * @return mixed
	 */
	public function is_stuck() {
		return false;
	}

	/**
	 * Orderly closes the queue process.
	 *
	 * @since 4.6.21
	 *
	 * @return bool
	 */
	public function kill_queue() {
		return true;
	}

	/**
	 * Whether the current queue process failed or not.
	 *
	 * @since 4.6.21
	 *
	 * @return bool
	 */
	public function has_errors() {
		return false;
	}

	/**
	 * Returns the queue error message.
	 *
	 * @since 4.6.21
	 *
	 * @return string
	 */
	public function get_error_message() {
		return '';
	}
}