20 $obBucket->FAILOVER_BUCKET_ID > 0
21 && $obBucket->FAILOVER_DELETE ===
'Y'
22 && $obBucket->getQueueFlag()
27 && ($obBucket->FAILOVER_ACTIVE ===
'Y')
30 $BUCKET_ID = $obBucket->ID;
34 $BUCKET_ID = $obBucket->FAILOVER_BUCKET_ID;
37 'TIMESTAMP_X' =>
new \
Bitrix\Main\Type\DateTime(),
38 'BUCKET_ID' => $BUCKET_ID,
39 'FILE_PATH' => $FILE_PATH,
44 public static function queueCopy($obBucket, $FILE_PATH)
47 $obBucket->FAILOVER_BUCKET_ID > 0
48 && $obBucket->FAILOVER_COPY ===
'Y'
49 && $obBucket->getQueueFlag()
54 && ($obBucket->FAILOVER_ACTIVE ===
'Y')
57 $TARGET_BUCKET_ID = $obBucket->ID;
58 $SOURCE_BUCKET_ID = $obBucket->FAILOVER_BUCKET_ID;
62 $TARGET_BUCKET_ID = $obBucket->FAILOVER_BUCKET_ID;
63 $SOURCE_BUCKET_ID = $obBucket->ID;
67 'TIMESTAMP_X' =>
new \
Bitrix\Main\Type\DateTime(),
68 'OP' => \
Bitrix\Clouds\CopyQueueTable::OP_COPY,
69 'SOURCE_BUCKET_ID' => $SOURCE_BUCKET_ID,
70 'SOURCE_FILE_PATH' => $FILE_PATH,
71 'TARGET_BUCKET_ID' => $TARGET_BUCKET_ID,
72 'TARGET_FILE_PATH' => $FILE_PATH,
78 '=BUCKET_ID' => $TARGET_BUCKET_ID,
79 '=FILE_PATH' => $FILE_PATH,
82 while ($task = $deleteTasks->fetch())
89 public static function queueRename($obBucket, $FILE_PATH_FROM, $FILE_PATH_TO)
92 $obBucket->FAILOVER_BUCKET_ID > 0
93 && $obBucket->FAILOVER_COPY ===
'Y'
94 && $obBucket->getQueueFlag()
99 && ($obBucket->FAILOVER_ACTIVE ===
'Y')
102 $BUCKET_ID = $obBucket->ID;
106 $BUCKET_ID = $obBucket->FAILOVER_BUCKET_ID;
110 'TIMESTAMP_X' =>
new \
Bitrix\Main\Type\DateTime(),
111 'OP' => \
Bitrix\Clouds\CopyQueueTable::OP_RENAME,
112 'SOURCE_BUCKET_ID' => $BUCKET_ID,
113 'SOURCE_FILE_PATH' => $FILE_PATH_FROM,
114 'TARGET_BUCKET_ID' => $BUCKET_ID,
115 'TARGET_FILE_PATH' => $FILE_PATH_TO,
121 '=BUCKET_ID' => $BUCKET_ID,
122 '=FILE_PATH' => $FILE_PATH_TO,
125 while ($task = $deleteTasks->fetch())
136 'order' => [
'ID' =>
'ASC']
143 && ($testBucket->FAILOVER_ACTIVE ===
'Y')
150 if ((time() - $deleteTask[
'TIMESTAMP_X']->getTimestamp()) > $obBucket->FAILOVER_DELETE_DELAY)
152 if ($obBucket->Init())
154 $obBucket->setQueueFlag(
false);
158 $fileExists = $obBucket->FileExists($deleteTask[
'FILE_PATH']);
161 $fileSize = $obBucket->GetFileSize($deleteTask[
'FILE_PATH']);
163 $result = $obBucket->DeleteFile($deleteTask[
'FILE_PATH']);
166 $obBucket->DecFileCounter($fileSize);
171 $obBucket->DeleteFile($deleteTask[
'FILE_PATH']);
188 'filter' => [
'=STATUS' =>
'Y'],
190 'order' => [
'ID' =>
'ASC']
194 if ($task[
'OP'] == \
Bitrix\Clouds\CopyQueueTable::OP_RENAME)
196 return static::executeRenameTask($task);
198 elseif ($task[
'OP'] == \
Bitrix\Clouds\CopyQueueTable::OP_COPY)
200 return static::executeCopyTask($task,
true);
202 elseif ($task[
'OP'] == \
Bitrix\Clouds\CopyQueueTable::OP_SYNC)
204 return static::executeCopyTask($task,
false);
225 && ($testBucket->FAILOVER_ACTIVE ===
'Y')
234 && ($testBucket->FAILOVER_ACTIVE ===
'Y')
242 if (!$sourceBucket->Init())
245 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
246 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
247 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): failed to init source bucket.'
253 if (!$targetBucket->Init())
256 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
257 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
258 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): failed to init target bucket.'
264 if (!$sourceBucket->FileExists($copyTask[
'SOURCE_FILE_PATH']))
267 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
268 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
269 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): source file does not exists.'
274 $CONTENT_TYPE = $sourceBucket->getService()->GetLastRequestHeader(
'Content-Type');
275 $CONTENT_LENGTH = $sourceBucket->getService()->GetLastRequestHeader(
'Content-Length');
277 if ($copyTask[
'FILE_SIZE'] == -1)
281 $copyTask[
'FILE_SIZE'] = intval($CONTENT_LENGTH);
285 $copyTask[
'FILE_SIZE'] = $sourceBucket->GetFileSize($copyTask[
'SOURCE_FILE_PATH']);
288 'FILE_SIZE' => $copyTask[
'FILE_SIZE'],
292 $targetBucket->setQueueFlag(
false);
293 $tempPath = $copyTask[
'TARGET_FILE_PATH'] .
'.fail-over-copy-part';
295 $CLOchunkSize = $targetBucket->getService()->GetMinUploadPartSize();
296 if ($copyTask[
'FILE_SIZE'] <= $CLOchunkSize)
298 $http = new \Bitrix\Main\Web\HttpClient([
299 'streamTimeout' => 0,
302 'type' => $CONTENT_TYPE,
305 $arFile[
'content'] = $http->get($sourceBucket->GetFileSRC($copyTask[
'SOURCE_FILE_PATH']));
306 if ($arFile[
'content'] ===
false)
309 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
310 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
311 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): failed to download.'
316 if (!$overwrite && $targetBucket->FileExists($copyTask[
'TARGET_FILE_PATH']))
322 $res = $targetBucket->SaveFile($copyTask[
'TARGET_FILE_PATH'], $arFile);
331 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
332 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
333 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): failed to upload file.'
340 if ($copyTask[
'FILE_POS'] == 0)
342 if (!$overwrite && $targetBucket->FileExists($copyTask[
'TARGET_FILE_PATH']))
348 if (!$upload->isStarted())
350 if (!$upload->Start($targetBucket, $copyTask[
'FILE_SIZE'], $CONTENT_TYPE))
353 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
354 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
355 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): failed to start upload.'
364 $http = new \Bitrix\Main\Web\HttpClient();
365 $rangeStart = $copyTask[
'FILE_POS'];
366 $rangeEnd = min($copyTask[
'FILE_POS'] + $targetBucket->getService()->GetMinUploadPartSize(), $copyTask[
'FILE_SIZE']) - 1;
367 $http->setHeader(
'Range',
'bytes=' . $rangeStart .
'-' . $rangeEnd);
368 $data = $http->get($sourceBucket->GetFileSRC($copyTask[
'SOURCE_FILE_PATH']));
370 $uploadResult =
false;
371 while ($upload->hasRetries())
373 if ($upload->Next(
$data, $targetBucket))
375 $uploadResult =
true;
383 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
384 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
385 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): upload part failed.'
390 $filePos = $upload->GetPos();
393 if ($filePos < $copyTask[
'FILE_SIZE'])
396 'FILE_POS' => $filePos,
401 if (!$upload->Finish($targetBucket))
404 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
405 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
406 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): finish has failed.'
413 $targetBucket->IncFileCounter($copyTask[
'FILE_SIZE']);
416 if ($overwrite && $targetBucket->FileExists($copyTask[
'TARGET_FILE_PATH']))
418 $fileSize = $targetBucket->GetFileSize($copyTask[
'TARGET_FILE_PATH']);
419 if ($targetBucket->DeleteFile($copyTask[
'TARGET_FILE_PATH']))
423 $targetBucket->DecFileCounter($fileSize);
428 if (!$targetBucket->FileRename($tempPath, $copyTask[
'TARGET_FILE_PATH']))
431 'FAIL_COUNTER' => $copyTask[
'FAIL_COUNTER'] + 1,
432 'STATUS' => $copyTask[
'FAIL_COUNTER'] >= COption::GetOptionInt(
'clouds',
'max_copy_fail_count') ?
'N' : $copyTask[
'STATUS'],
433 'ERROR_MESSAGE' =>
'CCloudFailover::executeCopyQueue(' . $copyTask[
'ID'] .
'): rename failed.'
506 public static function syncAgent($bucketFrom, $bucketTo, $limit = 100)
508 $bucketFrom = intval($bucketFrom);
509 $bucketTo = intval($bucketTo);
510 $limit = intval($limit);
517 $etime = time() + COption::GetOptionInt(
'clouds',
'sync_agent_time');
521 'select' => [
'SOURCE_FILE_PATH'],
523 '=OP' => \
Bitrix\Clouds\CopyQueueTable::OP_SYNC,
524 '=SOURCE_BUCKET_ID' => $bucketFrom,
525 '=TARGET_BUCKET_ID' => $bucketTo,
527 'order' => [
'ID' =>
'DESC'],
530 $lastKey = $lastJob ? ltrim($lastJob[
'SOURCE_FILE_PATH'],
'/') :
'';
532 $files = $bucket->ListFiles(
'/',
true, $limit, $lastKey);
533 if ($files ===
false || empty($files[
'file']))
541 'TIMESTAMP_X' =>
new \
Bitrix\Main\Type\DateTime(),
542 'OP' => \
Bitrix\Clouds\CopyQueueTable::OP_SYNC,
543 'SOURCE_BUCKET_ID' => $bucketFrom,
545 'TARGET_BUCKET_ID' => $bucketTo,
550 while (time() < $etime);
555 return 'CCloudFailover::syncAgent(' . $bucketFrom .
', ' . $bucketTo .
', ' . $limit .
');';
562 $max_parallel_count = COption::GetOptionInt(
'clouds',
'agents_max_parallel_count');
563 if ($max_parallel_count == 0)
567 elseif ($max_parallel_count == 1)
569 if (self::_lock_by_id(0))
576 for (
$i = 0;
$i < $max_parallel_count;
$i++)
578 $lockId = mt_rand(0, $max_parallel_count - 1);
579 if (self::_lock_by_id($lockId))
584 for (
$i = 0;
$i < $max_parallel_count;
$i++)
586 if (self::_lock_by_id(
$i))
607 $lock_file_template = CTempFile::GetAbsoluteRoot() .
'/clouds-%d.lock';
608 $lock_file_name = sprintf($lock_file_template, $lockId);
609 static::$lock = fopen($lock_file_name,
'w');
614 $locked = flock(static::$lock, LOCK_EX | LOCK_NB);
617 fclose(static::$lock);
618 static::$lock =
false;