/**
* Orthanc - A Lightweight, RESTful DICOM Store
* Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
* Department, University Hospital of Liege, Belgium
* Copyright (C) 2017-2023 Osimis S.A., Belgium
* Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
* Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
**/
#include "ThreadedSetOfInstancesJob.h"
#include "../../../OrthancFramework/Sources/Logging.h"
#include "../../../OrthancFramework/Sources/OrthancException.h"
#include "../../../OrthancFramework/Sources/SerializationToolbox.h"
#include "../ServerContext.h"
#include
#include
namespace Orthanc
{
static const char* EXIT_WORKER_MESSAGE = "exit";
ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
bool hasPostProcessing,
bool keepSource,
size_t workersCount) :
hasPostProcessing_(hasPostProcessing),
started_(false),
stopRequested_(false),
permissive_(false),
currentStep_(ThreadedJobStep_NotStarted),
workersCount_(workersCount),
context_(context),
keepSource_(keepSource),
errorCode_(ErrorCode_Success)
{
}
ThreadedSetOfInstancesJob::~ThreadedSetOfInstancesJob()
{
// no need to lock mutex here since we access variables used only by the "master" thread
StopWorkers();
WaitWorkersComplete();
}
void ThreadedSetOfInstancesJob::InitWorkers(size_t workersCount)
{
// no need to lock mutex here since we access variables used only by the "master" thread
for (size_t i = 0; i < workersCount; i++)
{
instancesWorkers_.push_back(boost::shared_ptr(new boost::thread(InstanceWorkerThread, this)));
}
}
void ThreadedSetOfInstancesJob::WaitWorkersComplete()
{
// no need to lock mutex here since we access variables used only by the "master" thread
// send a dummy "exit" message to all workers such that they stop waiting for messages on the queue
for (size_t i = 0; i < instancesWorkers_.size(); i++)
{
instancesToProcessQueue_.Enqueue(new SingleValueObject(EXIT_WORKER_MESSAGE));
}
for (size_t i = 0; i < instancesWorkers_.size(); i++)
{
if (instancesWorkers_[i]->joinable())
{
instancesWorkers_[i]->join();
}
}
instancesWorkers_.clear();
}
void ThreadedSetOfInstancesJob::StopWorkers()
{
boost::recursive_mutex::scoped_lock lock(mutex_);
instancesToProcessQueue_.Clear();
stopRequested_ = true;
}
void ThreadedSetOfInstancesJob::Stop(JobStopReason reason)
{
// no need to lock mutex here since we access variables used or set only by the "master" thread
if (reason == JobStopReason_Canceled ||
reason == JobStopReason_Failure ||
reason == JobStopReason_Retry)
{
// deallocate resources
StopWorkers();
WaitWorkersComplete();
}
else if (reason == JobStopReason_Paused)
{
// keep resources allocated.
// note that, right now, since all instances are queued from the start, this kind of jobs is not paused while in ProcessingInstances state
}
}
JobStepResult ThreadedSetOfInstancesJob::Step(const std::string& jobId)
{
// no need to lock mutex here since we access variables used or set only by the "master" thread
if (!started_)
{
throw OrthancException(ErrorCode_InternalError);
}
if (GetInstancesCount() == 0)
{
// No instances to handle: We're done
return JobStepResult::Success();
}
try
{
if (currentStep_ == ThreadedJobStep_NotStarted)
{
// create the workers and enqueue all instances
for (std::set::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
{
instancesToProcessQueue_.Enqueue(new SingleValueObject(*it));
}
InitWorkers(workersCount_);
currentStep_ = ThreadedJobStep_ProcessingInstances;
}
else if (currentStep_ == ThreadedJobStep_ProcessingInstances)
{
// wait until all instances are processed by the workers
if (instancesToProcessQueue_.GetSize() != 0)
{
// "slow down" the job main thread, to avoid using 100% of a core simply to check that other threads are done
boost::this_thread::sleep(boost::posix_time::milliseconds(5));
return JobStepResult::Continue();
}
else
{
WaitWorkersComplete();
// check job has really completed !!! it might have been interrupted because of an error
if ((processedInstances_.size() != instancesToProcess_.size())
|| (!IsPermissive() && failedInstances_.size() > 0))
{
return JobStepResult::Failure(GetErrorCode(), NULL);
}
currentStep_ = ThreadedJobStep_PostProcessingInstances;
return JobStepResult::Continue();
}
}
else if (currentStep_ == ThreadedJobStep_PostProcessingInstances)
{
if (HasPostProcessingStep())
{
PostProcessInstances();
}
currentStep_ = ThreadedJobStep_Cleanup;
return JobStepResult::Continue();
}
else if (currentStep_ == ThreadedJobStep_Cleanup)
{
// clean after the post processing step
if (HasCleanupStep())
{
for (std::set::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it)
{
Json::Value tmp;
context_.DeleteResource(tmp, *it, ResourceType_Instance);
}
}
currentStep_ = ThreadedJobStep_Done;
return JobStepResult::Success();
}
}
catch (OrthancException& e)
{
if (permissive_)
{
LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
}
else
{
return JobStepResult::Failure(e);
}
}
return JobStepResult::Continue();
}
bool ThreadedSetOfInstancesJob::HasPostProcessingStep() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return hasPostProcessing_;
}
void ThreadedSetOfInstancesJob::PostProcessInstances()
{
if (HasPostProcessingStep())
{
throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances");
}
}
bool ThreadedSetOfInstancesJob::HasCleanupStep() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return !keepSource_;
}
void ThreadedSetOfInstancesJob::InstanceWorkerThread(ThreadedSetOfInstancesJob* that)
{
static uint16_t threadCounter = 0;
Logging::SetCurrentThreadName(std::string("JOB-INS-WORK-") + boost::lexical_cast(threadCounter++));
threadCounter %= 1000;
while (true)
{
std::unique_ptr > instanceId(dynamic_cast*>(that->instancesToProcessQueue_.Dequeue(0)));
if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean
|| instanceId->GetValue() == EXIT_WORKER_MESSAGE)
{
return;
}
bool processed = false;
try
{
processed = that->HandleInstance(instanceId->GetValue());
}
catch (const Orthanc::OrthancException& e)
{
if (that->IsPermissive())
{
LOG(WARNING) << "Ignoring an error in a permissive job: " << e.What();
}
else
{
LOG(ERROR) << "Error in a non-permissive job: " << e.What();
that->SetErrorCode(e.GetErrorCode());
that->StopWorkers();
return;
}
}
catch (...)
{
LOG(ERROR) << "Native exception while executing a job";
that->SetErrorCode(ErrorCode_InternalError);
that->StopWorkers();
return;
}
{
boost::recursive_mutex::scoped_lock lock(that->mutex_);
that->processedInstances_.insert(instanceId->GetValue());
if (!processed)
{
that->failedInstances_.insert(instanceId->GetValue());
}
}
}
}
size_t ThreadedSetOfInstancesJob::GetInstancesCount() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return instancesToProcess_.size();
}
void ThreadedSetOfInstancesJob::GetFailedInstances(std::set& target) const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
target = failedInstances_;
}
void ThreadedSetOfInstancesJob::GetInstances(std::set& target) const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
target = instancesToProcess_;
}
bool ThreadedSetOfInstancesJob::IsFailedInstance(const std::string &instance) const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return failedInstances_.find(instance) != failedInstances_.end();
}
void ThreadedSetOfInstancesJob::Start()
{
boost::recursive_mutex::scoped_lock lock(mutex_);
started_ = true;
}
// Reset is called when resubmitting a failed job
void ThreadedSetOfInstancesJob::Reset()
{
boost::recursive_mutex::scoped_lock lock(mutex_);
if (started_)
{
// We actually cannot clean the instances that would have been generated during a previous run
// because the generated instances may or may not have the same orthanc ids as the source
// it is too dangerous to guess if they should be deleted or not
currentStep_ = ThreadedJobStep_NotStarted;
stopRequested_ = false;
processedInstances_.clear();
failedInstances_.clear();
instancesToProcessQueue_.Clear();
}
else
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
}
static const char* KEY_FAILED_INSTANCES = "FailedInstances";
static const char* KEY_PARENT_RESOURCES = "ParentResources";
static const char* KEY_DESCRIPTION = "Description";
static const char* KEY_PERMISSIVE = "Permissive";
static const char* KEY_CURRENT_STEP = "CurrentStep";
static const char* KEY_TYPE = "Type";
static const char* KEY_INSTANCES = "Instances";
static const char* KEY_INSTANCES_COUNT = "InstancesCount";
static const char* KEY_FAILED_INSTANCES_COUNT = "FailedInstancesCount";
static const char* KEY_KEEP_SOURCE = "KeepSource";
static const char* KEY_WORKERS_COUNT = "WorkersCount";
void ThreadedSetOfInstancesJob::GetPublicContent(Json::Value& target) const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
target[KEY_DESCRIPTION] = GetDescription();
target[KEY_INSTANCES_COUNT] = static_cast(GetInstancesCount());
target[KEY_FAILED_INSTANCES_COUNT] = static_cast(failedInstances_.size());
if (!parentResources_.empty())
{
SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
}
}
bool ThreadedSetOfInstancesJob::Serialize(Json::Value& target) const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
target = Json::objectValue;
std::string type;
GetJobType(type);
target[KEY_TYPE] = type;
target[KEY_PERMISSIVE] = permissive_;
target[KEY_CURRENT_STEP] = static_cast(currentStep_);
target[KEY_DESCRIPTION] = description_;
target[KEY_KEEP_SOURCE] = keepSource_;
target[KEY_WORKERS_COUNT] = static_cast(workersCount_);
SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES);
SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES);
SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES);
return true;
}
ThreadedSetOfInstancesJob::ThreadedSetOfInstancesJob(ServerContext& context,
const Json::Value& source,
bool hasPostProcessing,
bool defaultKeepSource) :
hasPostProcessing_(hasPostProcessing),
started_(false),
stopRequested_(false),
permissive_(false),
currentStep_(ThreadedJobStep_NotStarted),
workersCount_(1),
context_(context),
keepSource_(defaultKeepSource),
errorCode_(ErrorCode_Success)
{
SerializationToolbox::ReadSetOfStrings(failedInstances_, source, KEY_FAILED_INSTANCES);
if (source.isMember(KEY_PARENT_RESOURCES))
{
// Backward compatibility with Orthanc <= 1.5.6
SerializationToolbox::ReadSetOfStrings(parentResources_, source, KEY_PARENT_RESOURCES);
}
if (source.isMember(KEY_KEEP_SOURCE))
{
keepSource_ = SerializationToolbox::ReadBoolean(source, KEY_KEEP_SOURCE);
}
if (source.isMember(KEY_WORKERS_COUNT))
{
workersCount_ = SerializationToolbox::ReadUnsignedInteger(source, KEY_WORKERS_COUNT);
}
if (source.isMember(KEY_INSTANCES))
{
SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES);
}
if (source.isMember(KEY_CURRENT_STEP))
{
currentStep_ = static_cast(SerializationToolbox::ReadUnsignedInteger(source, KEY_CURRENT_STEP));
}
}
void ThreadedSetOfInstancesJob::SetKeepSource(bool keep)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
if (IsStarted())
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
keepSource_ = keep;
}
bool ThreadedSetOfInstancesJob::IsKeepSource() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return keepSource_;
}
float ThreadedSetOfInstancesJob::GetProgress() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
if (GetInstancesCount() == 0)
{
return 1;
}
else
{
size_t totalProgress = GetInstancesCount();
size_t currentProgress = processedInstances_.size();
if (HasPostProcessingStep())
{
++totalProgress;
if (currentStep_ > ThreadedJobStep_PostProcessingInstances)
{
++currentProgress;
}
}
if (HasCleanupStep())
{
++totalProgress;
if (currentStep_ > ThreadedJobStep_Cleanup)
{
++currentProgress;
}
}
return (static_cast(currentProgress) /
static_cast(totalProgress));
}
}
ThreadedSetOfInstancesJob::ThreadedJobStep ThreadedSetOfInstancesJob::GetCurrentStep() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return currentStep_;
}
void ThreadedSetOfInstancesJob::SetDescription(const std::string &description)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
description_ = description;
}
const std::string& ThreadedSetOfInstancesJob::GetDescription() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return description_;
}
void ThreadedSetOfInstancesJob::SetErrorCode(ErrorCode errorCode)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
errorCode_ = errorCode;
}
ErrorCode ThreadedSetOfInstancesJob::GetErrorCode() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return errorCode_;
}
bool ThreadedSetOfInstancesJob::IsPermissive() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return permissive_;
}
void ThreadedSetOfInstancesJob::SetPermissive(bool permissive)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
if (started_)
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
else
{
permissive_ = permissive;
}
}
bool ThreadedSetOfInstancesJob::IsStarted() const
{
boost::recursive_mutex::scoped_lock lock(mutex_);
return started_;
}
void ThreadedSetOfInstancesJob::AddInstances(const std::list& instances)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
for (std::list::const_iterator
it = instances.begin(); it != instances.end(); ++it)
{
instancesToProcess_.insert(*it);
}
}
void ThreadedSetOfInstancesJob::AddParentResource(const std::string &resource)
{
boost::recursive_mutex::scoped_lock lock(mutex_);
parentResources_.insert(resource);
}
}