File "Import_Events.php"

Full Path: /home/romayxjt/public_html/wp-content/plugins/the-events-calendar/src/Tribe/Aggregator/Processes/Import_Events.php
File size: 12.99 KB
MIME-type: text/x-php
Charset: utf-8

<?php

/**
 * Class Tribe__Events__Aggregator__Processes__Import_Events
 *
 * Imports events in an async queue.
 *
 * @since 4.6.16
 */
class Tribe__Events__Aggregator__Processes__Import_Events extends Tribe__Process__Queue {
	/**
	 * @var string
	 */
	protected $transitional_id;

	/**
	 * @var int The post ID of the record associated to this queue instance.
	 */
	protected $record_id;

	/**
	 * @var bool Whether the current item has dependencies or not.
	 */
	protected $has_dependencies = true;

	/**
	 * @var Tribe__Events__Aggregator__Record__Activity[]
	 */
	protected $activities = [];

	/**
	 * @var int The maximum number of times and item should be requeued due to unmet dependencies.
	 */
	protected $requeue_limit = 5;

	/**
	 * Returns the async process action name.
	 *
	 * @since 4.6.16
	 *
	 * @return string
	 */
	public static function action() {
		return 'ea_import_events';
	}

	public function __construct() {
		parent::__construct();

		/**
		 * Filters how many times an item can be requeued due to unmet dependencies.
		 *
		 * This is work-around for circular dependencies so higher number mean more safety and more
		 * processing time and smaller numbers mean less safety but reduced processing times.
		 *
		 * @param int $requeue_limit
		 * @param Tribe__Events__Aggregator__Processes__Import_Events $this
		 */
		$this->requeue_limit = apply_filters( 'tribe_aggregator_import_process_requeue_limit', $this->requeue_limit, $this );
	}

	/**
	 * Adds transitional data, used to check dependencies, to an event linked posts.
	 *
	 * @since 4.6.16
	 *
	 * @param array $event
	 */
	public function add_transitional_data( array $event ) {
		$venue_id      = Tribe__Utils__Array::get( $event, 'EventVenueID', false );
		$organizer_ids = Tribe__Utils__Array::get( $event, [ 'Organizer', 'OrganizerID' ], false );

		if ( false !== $venue_id ) {
			update_post_meta( $venue_id, $this->get_transitional_meta_key(), get_post_meta( $venue_id, '_tribe_aggregator_global_id', true ) );
		}

		if ( false !== $organizer_ids && is_array( $organizer_ids ) ) {
			foreach ( $organizer_ids as $organizer_id ) {
				update_post_meta( $organizer_id, $this->get_transitional_meta_key(), get_post_meta( $organizer_id, '_tribe_aggregator_global_id', true ) );
			}
		}
	}

	/**
	 * Returns the `meta_key` that will be used to store the transitional data
	 * in linked post for this import process.
	 *
	 * @since 4.6.16
	 *
	 * @param null $transitional_id
	 *
	 * @return string
	 */
	public function get_transitional_meta_key( $transitional_id = null ) {
		if ( null === $transitional_id ) {
			$transitional_id = $this->transitional_id;
		}

		return '_tribe_import_' . $transitional_id;
	}

	/**
	 * Sets the final part `meta_key` that should be used to store transitional
	 * information for this import process.
	 *
	 * @since 4.6.16
	 *
	 * @param string $transitional_id
	 */
	public function set_transitional_id( $transitional_id ) {
		$this->transitional_id = $transitional_id;
	}

	/**
	 * Overrides the parent `save` method to save some additional data.
	 *
	 * @since 4.6.16
	 *
	 * @return Tribe__Events__Aggregator__Processes__Import_Events
	 */
	public function save() {
		add_filter( "tribe_process_queue_{$this->identifier}_save_data", [ $this, 'save_data' ] );

		return parent::save();
	}

	/**
	 * Overrides the parent `update` method to save some additional data.
	 *
	 * @since 4.6.16
	 *
	 * @return Tribe__Events__Aggregator__Processes__Import_Events
	 */
	public function update( $key, $data ) {
		add_filter( "tribe_process_queue_{$this->identifier}_update_data", [ $this, 'save_data' ] );

		return parent::update( $key, $data );
	}

	/**
	 * Saves some additional data on the record to keep track of the progress.
	 *
	 * @since 4.6.16
	 *
	 * @param array $save_data
	 *
	 * @return array
	 */
	public function save_data( array $save_data = [] ) {
		$save_data['record_id'] = $this->record_id;

		return $save_data;
	}

	/**
	 * Returns this import process record post ID.
	 *
	 * @since 4.6.16
	 *
	 * @return int
	 */
	public function get_record_id() {
		return $this->record_id;
	}

	/**
	 * Sets this import process record ID.
	 *
	 * @since 4.6.16
	 *
	 * @param int $record_id
	 */
	public function set_record_id( $record_id ) {
		$this->record_id = $record_id;
	}

	/**
	 * Handles the real import.
	 *
	 * In short: if an event has dependencies and those are not yet all in place then the event
	 * will be re-queued; otherwise it's inserted.
	 *
	 * @since 4.6.16
	 *
	 * @param array $item
	 *
	 * @return array|false Either the event data to requeue or `false` if done.
	 */
	protected function task( $item ) {

		/**
		 * Allows replacing the event data import task completely.
		 *
		 * Returning a non `null` value here will replace the built in functionality with
		 * the one implemented by the filtering function.
		 *
		 * @since 4.6.19
		 *
		 * @param bool|null $done
		 * @param array|stdClass $item An object or array containing the raw data for this
		 *                             event.
		 */
		$done = apply_filters( 'tribe_aggregator_async_import_event_task', null, $item );
		if ( null !== $done ) {
			return $done;
		}

		$record_id             = $this->record_id = $item['record_id'];
		$data                  = (array) $item['data'];
		$this->transitional_id = tec_sanitize_string( $item['transitional_id'] );

		/*
		 * Make sure the import is happening in the context of the same site that started it.
		 * This deals with mishandling and orphaned calls to the the `switch_to_blog` function.
		 */
		$current_blog_id = is_multisite() ? get_current_blog_id() : 1;
		$task_blog_id = isset( $item['blog_id'] ) ? (int) $item['blog_id'] : $current_blog_id;

		if ( $current_blog_id !== $task_blog_id ) {
			/*
			 * Requeue this task and log an error. For whatever reason the blog id context of this task is not
			 * the expected one.
			 * We do not switch to the correct task blog to avoid potentially causing more issues: this is an issue
			 * already so let's log an error.
			 */
			/** @var Tribe__Log $logger */
			$logger = tribe( 'logger' );
			$logger->log_error(
				sprintf(
					'Event Aggregator import task supposed to run in context of blog %d, running instead in blog %d: not importing.',
					$task_blog_id,
					$current_blog_id
				),
				'Event Aggregator Import'
			);

			// Return the item to indicate the task should be re-queued.
			return $item;
		}

		/**
		 * To avoid deadlocks when dealing with circular dependencies an item can be requeued only
		 * so many times.
		 * Dependency checks are in place to avoid DB-related critical paths: moving forward to
		 * resolve a circular dependency after a reasonable time is a reasonable step.
		 */
		if ( empty( $item['requeued'] ) || ( (int) $item['requeued'] < $this->requeue_limit ) ) {
			$dependencies = $this->parse_linked_post_dependencies( $data );
		}

		if ( empty( $dependencies ) ) {
			$this->has_dependencies = false;
			$activity               = $this->insert_event( $record_id, (object) $data );
			$this->activities[]     = $activity;

			return $this->doing_sync ? $activity : false;
		}

		$dependencies_ids = $this->check_dependencies( $dependencies );

		if ( $dependencies_ids ) {
			$this->set_linked_posts_ids( $data, $dependencies_ids );
			$activity           = $this->insert_event( $record_id, (object) $data );
			$this->activities[] = $activity;

			return $this->doing_sync ? $activity : false;
		}

		// keep track of how many times the item was requeued due to unmet dependencies
		$item['requeued'] = isset( $item['requeued'] ) ? (int) ( $item['requeued'] ) + 1 : 1;

		return $item;
	}


	/**
	 * Parses the Event Venue and Organizer dependencies.
	 *
	 * @since 4.6.16
	 *
	 * @param array $data
	 *
	 * @return array An array containing a list of identifiers (contextual to the import) for the
	 *               dependencies.
	 */
	protected function parse_linked_post_dependencies( $data ) {
		$dependencies = [];

		if ( ! empty( $data['depends_on'] ) ) {
			$dependencies = array_values( (array) $data['depends_on'] );
		}

		return $dependencies;
	}

	/**
	 * Inserts an event.
	 *
	 * @since 4.6.16
	 *
	 * @param int $record_id
	 * @param object $data
	 *
	 * @return Tribe__Events__Aggregator__Record__Activity|bool Either the resulting activity or `false`
	 *                                                          if the record could not be found.
	 */
	protected function insert_event( $record_id, $data ) {
		try {
			$record = $this->get_record( $record_id );

			/**
			 * Allows replacing the event data insertion completely.
			 *
			 * Returning a non `null` value here will replace the built in functionality with
			 * the one implemented by the filtering function.
			 *
			 * @since 4.6.19
			 *
			 * @param null|Tribe__Events__Aggregator__Record__Activity $activity The activity resulting
			 *                                                                   from the event insertion.
			 * @param Tribe__Events__Aggregator__Record__Abstract $record        The current import record
			 * @param array|stdClass $data                                       An object or array containing the raw data for this
			 *                                                                   event.
			 */
			$activity = apply_filters( 'tribe_aggregator_async_insert_event', null, $record, $data );
			if ( null !== $activity ) {
				return $activity;
			}

			if ( empty( $record ) || $record instanceof WP_Error ) {
				// no point in going on
				return false;
			}

			if ( ! $this->has_dependencies ) {
				add_action( 'tribe_aggregator_after_insert_post', [ $this, 'add_transitional_data' ] );
			}

			$activity = $record->insert_posts( [ $data ] );
		} catch ( Exception $e ) {
			/** @var Tribe__Log $logger */
			$logger = tribe( 'logger' );
			$logger->log_error(
				sprintf(
					"Error while importing an event for the record %d: %s\nData: %s",
					$record_id,
					$e->getMessage(),
					json_encode( $data )
				),
				'Event Aggregator Import'
			);
			$activity         = new Tribe__Events__Aggregator__Record__Activity();
			$data             = (array) $data;
			$event_identifier = Tribe__Utils__Array::get( $data, 'global_id', reset( $data ) );
			$activity->add( 'event', 'skipped', [ $event_identifier ] );
		}

		$record->activity()->merge( $activity );
		$record->update_meta( 'activity', $record->activity() );

		return $activity;
	}

	/**
	 * Returns this import process record.
	 *
	 * @since 4.6.16
	 *
	 * @param int $record_id
	 *
	 * @return null|Tribe__Error|Tribe__Events__Aggregator__Record__Abstract
	 */
	protected function get_record( $record_id ) {
		/** @var Tribe__Events__Aggregator__Records $records */
		$records = tribe( 'events-aggregator.records' );
		$record  = $records->get_by_post_id( $record_id );

		return $record;
	}

	/**
	 * Checks the database to make sure all the dependencies are available.
	 *
	 * @since 4.6.16
	 *
	 * @param $dependencies
	 *
	 * @return array|bool e
	 */
	protected function check_dependencies( $dependencies ) {
		if ( empty( $dependencies ) ) {
			return true;
		}

		/** @var wpdb $wpdb */
		global $wpdb;

		$meta_values = [];
		foreach ( $dependencies as $meta_value ) {
			$meta_values[] = $wpdb->prepare( '%s', $meta_value );
		}
		$meta_values = implode( ',', $meta_values );

		$query = $wpdb->prepare(
			"SELECT pm.post_id, p.post_type
			FROM {$wpdb->postmeta} pm
			JOIN {$wpdb->posts} p
			ON p.ID = pm.post_id
			WHERE meta_key = %s
			AND meta_value IN ({$meta_values})",
			$this->get_transitional_meta_key()
		);

		$ids = $wpdb->get_results( $query );

		$can_create = count( $ids ) === count( $dependencies );

		return $can_create ? $ids : false;
	}

	/**
	 * Replaces, in the event data, the unique ids of the linked posts with their post IDs.
	 *
	 * @since 4.6.16
	 *
	 * @param array $data
	 * @param array $dependencies_ids
	 *
	 * @return array
	 */
	protected function set_linked_posts_ids( &$data, array $dependencies_ids ) {
		$linked_post_types = [
			'venue'     => Tribe__Events__Venue::POSTTYPE,
			'organizer' => Tribe__Events__Organizer::POSTTYPE,
		];

		foreach ( $linked_post_types as $linked_post_key => $linked_post_type ) {
			$linked_post_ids = wp_list_pluck( wp_list_filter( $dependencies_ids, [ 'post_type' => $linked_post_type ] ), 'post_id' );

			if ( empty( $linked_post_ids ) ) {
				continue;
			}

			if ( 'venue' === $linked_post_key ) {
				$data['venue'] = (object) [ '_venue_id' => reset( $linked_post_ids ) ];
			} else {
				$data[ $linked_post_key ] = [];
				foreach ( $linked_post_ids as $id ) {
					$data[ $linked_post_key ][] = (object) [ "_{$linked_post_key}_id" => $id ];
				}
			}
		}

		return $data;
	}

	/**
	 * {@inheritdoc}
	 */
	protected function complete() {
		parent::complete();

		/** @var wpdb $wpdb */
		global $wpdb;

		$wpdb->query(
			$wpdb->prepare(
				"DELETE FROM {$wpdb->postmeta} WHERE meta_key = %s",
				$this->get_transitional_meta_key()
			)
		);

		$record = $this->get_record( $this->record_id );

		if ( empty( $record ) || $record instanceof WP_Error ) {
			// no point in going on
			return false;
		}

		$record->set_status_as_success();
		$record->delete_meta( 'queue' );
		$record->delete_meta( 'in_progress' );
	}
}