get_by_post_id( $record ); } if ( ! is_object( $record ) || ! $record instanceof \Tribe__Events__Aggregator__Record__Abstract ) { $this->null_process = true; return; } if ( is_wp_error( $items ) ) { $this->null_process = true; return; } $this->cleaner = $cleaner ? $cleaner : new Tribe__Events__Aggregator__Record__Queue_Cleaner(); $this->cleaner->remove_duplicate_pending_records_for( $record ); $failed = $this->cleaner->maybe_fail_stalled_record( $record ); if ( $failed ) { $this->null_process = true; return; } $this->record = $record; $this->activity(); if ( ! empty( $items ) ) { if ( 'fetch' === $items ) { $this->is_fetching = true; $this->items = 'fetch'; } else { $this->init_queue( $items ); } $this->save(); } else { $this->load_queue(); } $this->cleaner = $cleaner; } public function __get( $key ) { switch ( $key ) { case 'activity': return $this->activity(); break; } } protected function init_queue( $items ) { if ( 'csv' === $this->record->origin ) { $this->record->reset_tracking_options(); $this->importer = $items; $this->total = $this->importer->get_line_count(); $this->items = array_fill( 0, $this->total, true ); } else { $this->items = $items; // Count the Total of items now and stores as the total $this->total = count( $this->items ); } } protected function load_queue() { if ( empty( $this->record->meta[ self::$queue_key ] ) ) { $this->is_fetching = false; $this->items = []; } else { $this->items = $this->record->meta[ self::$queue_key ]; } if ( 'fetch' === $this->items ) { $this->is_fetching = true; } } public function activity() { if ( empty( $this->activity ) ) { if ( empty( $this->record->meta[ self::$activity_key ] ) || ! $this->record->meta[ self::$activity_key ] instanceof Tribe__Events__Aggregator__Record__Activity ) { $this->activity = new Tribe__Events__Aggregator__Record__Activity; } else { $this->activity = $this->record->meta[ self::$activity_key ]; } } return $this->activity; } /** * Allows us to check if the Events Data has still pending * * @return boolean */ public function is_fetching() { return $this->is_fetching; } /** * Shortcut to check how many items are going to be processed next * * @return int */ public function count() { return is_array( $this->items ) ? count( $this->items ) : 0; } /** * Shortcut to check if this queue is empty or it has a null process. * * @return boolean `true` if this queue instance has acquired the lock and * the count is 0, `false` otherwise. */ public function is_empty() { if ( $this->null_process ) { return true; } return $this->has_lock && 0 === $this->count(); } /** * Gets the queue's total * * @return int */ protected function get_total() { return $this->count() + $this->activity->count( $this->get_queue_type() ); } /** * Saves queue data to relevant meta keys on the post * * @return self */ protected function save() { $this->record->update_meta( self::$activity_key, $this->activity ); /** @var Tribe__Meta__Chunker $chunker */ $chunker = tribe( 'chunker' ); // this data has the potential to be very big, so we register it for possible chunking in the db $key = Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . self::$queue_key; $chunker->register_chunking_for( $this->record->post->ID, $key ); if ( empty( $this->items ) ) { $this->record->delete_meta( self::$queue_key ); } else { $this->record->update_meta( self::$queue_key, $this->items ); } // If we have a parent also update that if ( ! empty( $this->record->post->post_parent ) ) { $parent = Tribe__Events__Aggregator__Records::instance()->get_by_post_id( $this->record->post->post_parent ); if ( ! tribe_is_error( $parent ) && isset( $parent->meta[ self::$activity_key ] ) ) { $activity = $parent->meta[ self::$activity_key ]; if ( $activity instanceof Tribe__Events__Aggregator__Record__Activity ) { $parent->update_meta( self::$activity_key, $activity->merge( $this->activity ) ); } } } // Updates the Modified time for the Record Log $args = [ 'ID' => $this->record->post->ID, 'post_modified' => date( Tribe__Date_Utils::DBDATETIMEFORMAT, current_time( 'timestamp' ) ), ]; if ( empty( $this->items ) ) { $args['post_status'] = Tribe__Events__Aggregator__Records::$status->success; } wp_update_post( $args ); $this->release_lock(); return $this; } /** * Processes a batch for the queue * * @return self|Tribe__Events__Aggregator__Record__Activity */ public function process( $batch_size = null ) { if ( $this->null_process ) { return $this; } $this->has_lock = $this->acquire_lock(); if ( $this->has_lock ) { if ( $this->is_fetching() ) { if ( $this->record->should_queue_import() ) { $response = $this->record->queue_import(); if ( $response instanceof WP_Error ) { // the import queueing generated an error $this->record->set_status_as_failed( $response ); return $this; } if ( is_numeric( $response ) ) { // the import queueing was rescheduled $this->record->set_status_as_pending(); return $this; } } $data = $this->record->prep_import_data(); if ( 'fetch' === $data || ! is_array( $data ) || is_wp_error( $data ) ) { $this->release_lock(); $this->is_fetching = false; return $this->activity(); } $this->init_queue( $data ); $this->save(); } if ( ! $batch_size ) { $batch_size = apply_filters( 'tribe_aggregator_batch_size', Tribe__Events__Aggregator__Record__Queue_Processor::$batch_size ); } /* * If the queue system is switched mid-imports this might happen. * In that case we conservatively stop (kill) the queue process. */ if ( ! is_array( $this->items ) ) { $this->kill_queue(); return $this; } // Every time we are about to process we reset the next var $this->next = array_splice( $this->items, 0, $batch_size ); if ( 'csv' === $this->record->origin ) { $activity = $this->record->continue_import(); } else { $activity = $this->record->insert_posts( $this->next ); } $this->activity = $this->activity()->merge( $activity ); } else { // this queue instance should not register any new activity $this->activity = $this->activity(); } return $this->save(); } /** * Returns the total progress made on processing the queue so far as a percentage. * * @return int */ public function progress_percentage() { if ( 0 === $this->count() ) { return 0; } $total = $this->get_total(); $processed = $total - $this->count(); $percent = ( $processed / $total ) * 100; return (int) $percent; } /** * Sets a flag to indicate that update work is in progress for a specific event: * this can be useful to prevent collisions between cron-based updated and realtime * updates. * * The flag naturally expires after an hour to allow for recovery if for instance * execution hangs half way through the processing of a batch. */ public function set_in_progress_flag() { if ( empty( $this->record->id ) ) { return; } Tribe__Post_Transient::instance()->set( $this->record->id, self::$in_progress_key, true, HOUR_IN_SECONDS ); } /** * Clears the in progress flag. */ public function clear_in_progress_flag() { if ( empty( $this->record->id ) ) { return; } Tribe__Post_Transient::instance()->delete( $this->record->id, self::$in_progress_key ); } /** * Indicates if the queue for the current event is actively being processed. * * @return bool */ public function is_in_progress() { if ( empty( $this->record->id ) ) { return false; } Tribe__Post_Transient::instance()->get( $this->record->id, self::$in_progress_key ); } /** * Returns the primary post type the queue is processing * * @return string */ public function get_queue_type() { $item_type = Tribe__Events__Main::POSTTYPE; if ( ! empty( $this->record->origin ) && 'csv' === $this->record->origin ) { $item_type = $this->record->meta['content_type']; } return $item_type; } /** * Acquires the global (db stored) queue lock if available. * * @since 4.5.12 * * @return bool Whether the lock could be acquired or not if another instance/process has * already acquired the lock. */ protected function acquire_lock() { if ( empty( $this->record->post->ID ) ) { return false; } $post_id = $this->record->post->ID; $post_transient = Tribe__Post_Transient::instance(); $locked = $post_transient->get( $post_id, 'aggregator_queue_lock' ); if ( ! empty( $locked ) ) { return false; } $post_transient->set( $post_id, 'aggregator_queue_lock', '1', 180 ); return true; } /** * Release the queue lock if this instance of the queue holds it. * * @since 4.5.12 * * @return bool */ protected function release_lock() { if ( empty( $this->record->post->ID ) || ! $this->has_lock ) { return false; } $post_id = $this->record->post->ID; $post_transient = Tribe__Post_Transient::instance(); $post_transient->delete( $post_id, 'aggregator_queue_lock' ); return true; } /** * Whether the current queue process is stuck or not. * * @since 4.6.21 * * @return mixed */ public function is_stuck() { return false; } /** * Orderly closes the queue process. * * @since 4.6.21 * * @return bool */ public function kill_queue() { return true; } /** * Whether the current queue process failed or not. * * @since 4.6.21 * * @return bool */ public function has_errors() { return false; } /** * Returns the queue error message. * * @since 4.6.21 * * @return string */ public function get_error_message() { return ''; } }