512 lines
14 KiB
C++
512 lines
14 KiB
C++
/**
|
|
* Orthanc - A Lightweight, RESTful DICOM Store
|
|
* Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
|
|
* Department, University Hospital of Liege, Belgium
|
|
* Copyright (C) 2017-2023 Osimis S.A., Belgium
|
|
* Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
|
|
* Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
|
|
*
|
|
* This program is free software: you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public License
|
|
* as published by the Free Software Foundation, either version 3 of
|
|
* the License, or (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with this program. If not, see
|
|
* <http://www.gnu.org/licenses/>.
|
|
**/
|
|
|
|
|
|
#include "../../PrecompiledHeaders.h"
|
|
#include "SequenceOfOperationsJob.h"
|
|
|
|
#include "../../Logging.h"
|
|
#include "../../OrthancException.h"
|
|
#include "../../SerializationToolbox.h"
|
|
#include "../IJobUnserializer.h"
|
|
|
|
namespace Orthanc
|
|
{
|
|
static const char* CURRENT = "Current";
|
|
static const char* DESCRIPTION = "Description";
|
|
static const char* NEXT_OPERATIONS = "Next";
|
|
static const char* OPERATION = "Operation";
|
|
static const char* OPERATIONS = "Operations";
|
|
static const char* ORIGINAL_INPUTS = "OriginalInputs";
|
|
static const char* TRAILING_TIMEOUT = "TrailingTimeout";
|
|
static const char* TYPE = "Type";
|
|
static const char* WORK_INPUTS = "WorkInputs";
|
|
|
|
|
|
class SequenceOfOperationsJob::Operation : public boost::noncopyable
|
|
{
|
|
private:
|
|
size_t index_;
|
|
std::unique_ptr<IJobOperation> operation_;
|
|
std::unique_ptr<JobOperationValues> originalInputs_;
|
|
std::unique_ptr<JobOperationValues> workInputs_;
|
|
std::list<Operation*> nextOperations_;
|
|
size_t currentInput_;
|
|
|
|
public:
|
|
Operation(size_t index,
|
|
IJobOperation* operation) :
|
|
index_(index),
|
|
operation_(operation),
|
|
originalInputs_(new JobOperationValues),
|
|
workInputs_(new JobOperationValues),
|
|
currentInput_(0)
|
|
{
|
|
if (operation == NULL)
|
|
{
|
|
throw OrthancException(ErrorCode_NullPointer);
|
|
}
|
|
}
|
|
|
|
void AddOriginalInput(const IJobOperationValue& value)
|
|
{
|
|
if (currentInput_ != 0)
|
|
{
|
|
// Cannot add input after processing has started
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
else
|
|
{
|
|
originalInputs_->Append(value.Clone());
|
|
}
|
|
}
|
|
|
|
const JobOperationValues& GetOriginalInputs() const
|
|
{
|
|
return *originalInputs_;
|
|
}
|
|
|
|
void Reset()
|
|
{
|
|
workInputs_->Clear();
|
|
currentInput_ = 0;
|
|
}
|
|
|
|
void AddNextOperation(Operation& other,
|
|
bool unserializing)
|
|
{
|
|
if (other.index_ <= index_)
|
|
{
|
|
throw OrthancException(ErrorCode_InternalError);
|
|
}
|
|
|
|
if (!unserializing &&
|
|
currentInput_ != 0)
|
|
{
|
|
// Cannot add input after processing has started
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
else
|
|
{
|
|
nextOperations_.push_back(&other);
|
|
}
|
|
}
|
|
|
|
bool IsDone() const
|
|
{
|
|
return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
|
|
}
|
|
|
|
void Step()
|
|
{
|
|
if (IsDone())
|
|
{
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
|
|
const IJobOperationValue* input;
|
|
|
|
if (currentInput_ < originalInputs_->GetSize())
|
|
{
|
|
input = &originalInputs_->GetValue(currentInput_);
|
|
}
|
|
else
|
|
{
|
|
input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
|
|
}
|
|
|
|
JobOperationValues outputs;
|
|
operation_->Apply(outputs, *input);
|
|
|
|
if (!nextOperations_.empty())
|
|
{
|
|
std::list<Operation*>::iterator first = nextOperations_.begin();
|
|
outputs.Move(*(*first)->workInputs_);
|
|
|
|
std::list<Operation*>::iterator current = first;
|
|
++current;
|
|
|
|
while (current != nextOperations_.end())
|
|
{
|
|
(*first)->workInputs_->Copy(*(*current)->workInputs_);
|
|
++current;
|
|
}
|
|
}
|
|
|
|
currentInput_ += 1;
|
|
}
|
|
|
|
void Serialize(Json::Value& target) const
|
|
{
|
|
target = Json::objectValue;
|
|
target[CURRENT] = static_cast<unsigned int>(currentInput_);
|
|
operation_->Serialize(target[OPERATION]);
|
|
originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
|
|
workInputs_->Serialize(target[WORK_INPUTS]);
|
|
|
|
Json::Value tmp = Json::arrayValue;
|
|
for (std::list<Operation*>::const_iterator it = nextOperations_.begin();
|
|
it != nextOperations_.end(); ++it)
|
|
{
|
|
tmp.append(static_cast<int>((*it)->index_));
|
|
}
|
|
|
|
target[NEXT_OPERATIONS] = tmp;
|
|
}
|
|
|
|
Operation(IJobUnserializer& unserializer,
|
|
Json::Value::ArrayIndex index,
|
|
const Json::Value& serialized) :
|
|
index_(index)
|
|
{
|
|
if (serialized.type() != Json::objectValue ||
|
|
!serialized.isMember(OPERATION) ||
|
|
!serialized.isMember(ORIGINAL_INPUTS) ||
|
|
!serialized.isMember(WORK_INPUTS))
|
|
{
|
|
throw OrthancException(ErrorCode_BadFileFormat);
|
|
}
|
|
|
|
currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
|
|
operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
|
|
originalInputs_.reset(JobOperationValues::Unserialize
|
|
(unserializer, serialized[ORIGINAL_INPUTS]));
|
|
workInputs_.reset(JobOperationValues::Unserialize
|
|
(unserializer, serialized[WORK_INPUTS]));
|
|
}
|
|
};
|
|
|
|
|
|
SequenceOfOperationsJob::SequenceOfOperationsJob() :
|
|
done_(false),
|
|
current_(0),
|
|
trailingTimeout_(boost::posix_time::milliseconds(1000))
|
|
{
|
|
}
|
|
|
|
|
|
SequenceOfOperationsJob::~SequenceOfOperationsJob()
|
|
{
|
|
for (size_t i = 0; i < operations_.size(); i++)
|
|
{
|
|
if (operations_[i] != NULL)
|
|
{
|
|
delete operations_[i];
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::SetDescription(const std::string& description)
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
description_ = description;
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::GetDescription(std::string& description)
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
description = description_;
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::Register(IObserver& observer)
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
observers_.push_back(&observer);
|
|
}
|
|
|
|
|
|
#if ORTHANC_BUILDING_FRAMEWORK_LIBRARY == 1
|
|
void SequenceOfOperationsJob::Lock::AddInput(size_t index,
|
|
const JobOperationValue& value)
|
|
{
|
|
throw OrthancException(ErrorCode_DiscontinuedAbi, "Removed in 1.8.1");
|
|
}
|
|
#endif
|
|
|
|
|
|
SequenceOfOperationsJob::Lock::Lock(SequenceOfOperationsJob& that) :
|
|
that_(that),
|
|
lock_(that.mutex_)
|
|
{
|
|
}
|
|
|
|
bool SequenceOfOperationsJob::Lock::IsDone() const
|
|
{
|
|
return that_.done_;
|
|
}
|
|
|
|
void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout)
|
|
{
|
|
that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
|
|
}
|
|
|
|
|
|
size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation)
|
|
{
|
|
if (IsDone())
|
|
{
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
|
|
size_t index = that_.operations_.size();
|
|
|
|
that_.operations_.push_back(new Operation(index, operation));
|
|
that_.operationAdded_.notify_one();
|
|
|
|
return index;
|
|
}
|
|
|
|
size_t SequenceOfOperationsJob::Lock::GetOperationsCount() const
|
|
{
|
|
return that_.operations_.size();
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::Lock::AddInput(size_t index,
|
|
const IJobOperationValue& value)
|
|
{
|
|
if (IsDone())
|
|
{
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
else if (index >= that_.operations_.size() ||
|
|
index < that_.current_)
|
|
{
|
|
throw OrthancException(ErrorCode_ParameterOutOfRange);
|
|
}
|
|
else
|
|
{
|
|
that_.operations_[index]->AddOriginalInput(value);
|
|
}
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::Lock::Connect(size_t input,
|
|
size_t output)
|
|
{
|
|
if (IsDone())
|
|
{
|
|
throw OrthancException(ErrorCode_BadSequenceOfCalls);
|
|
}
|
|
else if (input >= output ||
|
|
input >= that_.operations_.size() ||
|
|
output >= that_.operations_.size() ||
|
|
input < that_.current_ ||
|
|
output < that_.current_)
|
|
{
|
|
throw OrthancException(ErrorCode_ParameterOutOfRange);
|
|
}
|
|
else
|
|
{
|
|
Operation& a = *that_.operations_[input];
|
|
Operation& b = *that_.operations_[output];
|
|
a.AddNextOperation(b, false /* not unserializing */);
|
|
}
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::Start()
|
|
{
|
|
}
|
|
|
|
|
|
JobStepResult SequenceOfOperationsJob::Step(const std::string& jobId)
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
|
|
if (current_ == operations_.size())
|
|
{
|
|
LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
|
|
operationAdded_.timed_wait(lock, trailingTimeout_);
|
|
|
|
if (current_ == operations_.size())
|
|
{
|
|
// No operation was added during the trailing timeout: The
|
|
// job is over
|
|
LOG(INFO) << "The sequence of operations is over";
|
|
done_ = true;
|
|
|
|
for (std::list<IObserver*>::iterator it = observers_.begin();
|
|
it != observers_.end(); ++it)
|
|
{
|
|
(*it)->SignalDone(*this);
|
|
}
|
|
|
|
return JobStepResult::Success();
|
|
}
|
|
else
|
|
{
|
|
LOG(INFO) << "New operation were added to the sequence of operations";
|
|
}
|
|
}
|
|
|
|
assert(current_ < operations_.size());
|
|
|
|
while (current_ < operations_.size() &&
|
|
operations_[current_]->IsDone())
|
|
{
|
|
current_++;
|
|
}
|
|
|
|
if (current_ < operations_.size())
|
|
{
|
|
operations_[current_]->Step();
|
|
}
|
|
|
|
return JobStepResult::Continue();
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::Reset()
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
|
|
current_ = 0;
|
|
done_ = false;
|
|
|
|
for (size_t i = 0; i < operations_.size(); i++)
|
|
{
|
|
operations_[i]->Reset();
|
|
}
|
|
}
|
|
|
|
void SequenceOfOperationsJob::Stop(JobStopReason reason)
|
|
{
|
|
}
|
|
|
|
|
|
float SequenceOfOperationsJob::GetProgress() const
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
|
|
return (static_cast<float>(current_) /
|
|
static_cast<float>(operations_.size() + 1));
|
|
}
|
|
|
|
void SequenceOfOperationsJob::GetJobType(std::string& target) const
|
|
{
|
|
target = "SequenceOfOperations";
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::GetPublicContent(Json::Value& value) const
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
|
|
value["CountOperations"] = static_cast<unsigned int>(operations_.size());
|
|
value["Description"] = description_;
|
|
}
|
|
|
|
|
|
bool SequenceOfOperationsJob::Serialize(Json::Value& value) const
|
|
{
|
|
boost::mutex::scoped_lock lock(mutex_);
|
|
|
|
value = Json::objectValue;
|
|
|
|
std::string jobType;
|
|
GetJobType(jobType);
|
|
value[TYPE] = jobType;
|
|
|
|
value[DESCRIPTION] = description_;
|
|
value[TRAILING_TIMEOUT] = static_cast<unsigned int>(trailingTimeout_.total_milliseconds());
|
|
value[CURRENT] = static_cast<unsigned int>(current_);
|
|
|
|
Json::Value tmp = Json::arrayValue;
|
|
for (size_t i = 0; i < operations_.size(); i++)
|
|
{
|
|
Json::Value operation = Json::objectValue;
|
|
operations_[i]->Serialize(operation);
|
|
tmp.append(operation);
|
|
}
|
|
|
|
value[OPERATIONS] = tmp;
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
void SequenceOfOperationsJob::AwakeTrailingSleep()
|
|
{
|
|
operationAdded_.notify_one();
|
|
}
|
|
|
|
|
|
SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
|
|
const Json::Value& serialized) :
|
|
done_(false)
|
|
{
|
|
std::string jobType;
|
|
GetJobType(jobType);
|
|
|
|
if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
|
|
!serialized.isMember(OPERATIONS) ||
|
|
serialized[OPERATIONS].type() != Json::arrayValue)
|
|
{
|
|
throw OrthancException(ErrorCode_BadFileFormat);
|
|
}
|
|
|
|
description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
|
|
trailingTimeout_ = boost::posix_time::milliseconds
|
|
(SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
|
|
current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
|
|
|
|
const Json::Value& ops = serialized[OPERATIONS];
|
|
|
|
// Unserialize the individual operations
|
|
operations_.reserve(ops.size());
|
|
for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
|
|
{
|
|
operations_.push_back(new Operation(unserializer, i, ops[i]));
|
|
}
|
|
|
|
// Connect the next operations
|
|
for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
|
|
{
|
|
if (!ops[i].isMember(NEXT_OPERATIONS) ||
|
|
ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
|
|
{
|
|
throw OrthancException(ErrorCode_BadFileFormat);
|
|
}
|
|
|
|
const Json::Value& next = ops[i][NEXT_OPERATIONS];
|
|
for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
|
|
{
|
|
if (next[j].type() != Json::intValue ||
|
|
next[j].asInt() < 0 ||
|
|
next[j].asUInt() >= operations_.size())
|
|
{
|
|
throw OrthancException(ErrorCode_BadFileFormat);
|
|
}
|
|
else
|
|
{
|
|
operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|