/**
* Orthanc - A Lightweight, RESTful DICOM Store
* Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
* Department, University Hospital of Liege, Belgium
* Copyright (C) 2017-2023 Osimis S.A., Belgium
* Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
* Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program. If not, see
* .
**/
#include "../../PrecompiledHeaders.h"
#include "SequenceOfOperationsJob.h"
#include "../../Logging.h"
#include "../../OrthancException.h"
#include "../../SerializationToolbox.h"
#include "../IJobUnserializer.h"
namespace Orthanc
{
static const char* CURRENT = "Current";
static const char* DESCRIPTION = "Description";
static const char* NEXT_OPERATIONS = "Next";
static const char* OPERATION = "Operation";
static const char* OPERATIONS = "Operations";
static const char* ORIGINAL_INPUTS = "OriginalInputs";
static const char* TRAILING_TIMEOUT = "TrailingTimeout";
static const char* TYPE = "Type";
static const char* WORK_INPUTS = "WorkInputs";
class SequenceOfOperationsJob::Operation : public boost::noncopyable
{
private:
size_t index_;
std::unique_ptr operation_;
std::unique_ptr originalInputs_;
std::unique_ptr workInputs_;
std::list nextOperations_;
size_t currentInput_;
public:
Operation(size_t index,
IJobOperation* operation) :
index_(index),
operation_(operation),
originalInputs_(new JobOperationValues),
workInputs_(new JobOperationValues),
currentInput_(0)
{
if (operation == NULL)
{
throw OrthancException(ErrorCode_NullPointer);
}
}
void AddOriginalInput(const IJobOperationValue& value)
{
if (currentInput_ != 0)
{
// Cannot add input after processing has started
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
else
{
originalInputs_->Append(value.Clone());
}
}
const JobOperationValues& GetOriginalInputs() const
{
return *originalInputs_;
}
void Reset()
{
workInputs_->Clear();
currentInput_ = 0;
}
void AddNextOperation(Operation& other,
bool unserializing)
{
if (other.index_ <= index_)
{
throw OrthancException(ErrorCode_InternalError);
}
if (!unserializing &&
currentInput_ != 0)
{
// Cannot add input after processing has started
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
else
{
nextOperations_.push_back(&other);
}
}
bool IsDone() const
{
return currentInput_ >= originalInputs_->GetSize() + workInputs_->GetSize();
}
void Step()
{
if (IsDone())
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
const IJobOperationValue* input;
if (currentInput_ < originalInputs_->GetSize())
{
input = &originalInputs_->GetValue(currentInput_);
}
else
{
input = &workInputs_->GetValue(currentInput_ - originalInputs_->GetSize());
}
JobOperationValues outputs;
operation_->Apply(outputs, *input);
if (!nextOperations_.empty())
{
std::list::iterator first = nextOperations_.begin();
outputs.Move(*(*first)->workInputs_);
std::list::iterator current = first;
++current;
while (current != nextOperations_.end())
{
(*first)->workInputs_->Copy(*(*current)->workInputs_);
++current;
}
}
currentInput_ += 1;
}
void Serialize(Json::Value& target) const
{
target = Json::objectValue;
target[CURRENT] = static_cast(currentInput_);
operation_->Serialize(target[OPERATION]);
originalInputs_->Serialize(target[ORIGINAL_INPUTS]);
workInputs_->Serialize(target[WORK_INPUTS]);
Json::Value tmp = Json::arrayValue;
for (std::list::const_iterator it = nextOperations_.begin();
it != nextOperations_.end(); ++it)
{
tmp.append(static_cast((*it)->index_));
}
target[NEXT_OPERATIONS] = tmp;
}
Operation(IJobUnserializer& unserializer,
Json::Value::ArrayIndex index,
const Json::Value& serialized) :
index_(index)
{
if (serialized.type() != Json::objectValue ||
!serialized.isMember(OPERATION) ||
!serialized.isMember(ORIGINAL_INPUTS) ||
!serialized.isMember(WORK_INPUTS))
{
throw OrthancException(ErrorCode_BadFileFormat);
}
currentInput_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
operation_.reset(unserializer.UnserializeOperation(serialized[OPERATION]));
originalInputs_.reset(JobOperationValues::Unserialize
(unserializer, serialized[ORIGINAL_INPUTS]));
workInputs_.reset(JobOperationValues::Unserialize
(unserializer, serialized[WORK_INPUTS]));
}
};
SequenceOfOperationsJob::SequenceOfOperationsJob() :
done_(false),
current_(0),
trailingTimeout_(boost::posix_time::milliseconds(1000))
{
}
SequenceOfOperationsJob::~SequenceOfOperationsJob()
{
for (size_t i = 0; i < operations_.size(); i++)
{
if (operations_[i] != NULL)
{
delete operations_[i];
}
}
}
void SequenceOfOperationsJob::SetDescription(const std::string& description)
{
boost::mutex::scoped_lock lock(mutex_);
description_ = description;
}
void SequenceOfOperationsJob::GetDescription(std::string& description)
{
boost::mutex::scoped_lock lock(mutex_);
description = description_;
}
void SequenceOfOperationsJob::Register(IObserver& observer)
{
boost::mutex::scoped_lock lock(mutex_);
observers_.push_back(&observer);
}
#if ORTHANC_BUILDING_FRAMEWORK_LIBRARY == 1
void SequenceOfOperationsJob::Lock::AddInput(size_t index,
const JobOperationValue& value)
{
throw OrthancException(ErrorCode_DiscontinuedAbi, "Removed in 1.8.1");
}
#endif
SequenceOfOperationsJob::Lock::Lock(SequenceOfOperationsJob& that) :
that_(that),
lock_(that.mutex_)
{
}
bool SequenceOfOperationsJob::Lock::IsDone() const
{
return that_.done_;
}
void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout)
{
that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
}
size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation)
{
if (IsDone())
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
size_t index = that_.operations_.size();
that_.operations_.push_back(new Operation(index, operation));
that_.operationAdded_.notify_one();
return index;
}
size_t SequenceOfOperationsJob::Lock::GetOperationsCount() const
{
return that_.operations_.size();
}
void SequenceOfOperationsJob::Lock::AddInput(size_t index,
const IJobOperationValue& value)
{
if (IsDone())
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
else if (index >= that_.operations_.size() ||
index < that_.current_)
{
throw OrthancException(ErrorCode_ParameterOutOfRange);
}
else
{
that_.operations_[index]->AddOriginalInput(value);
}
}
void SequenceOfOperationsJob::Lock::Connect(size_t input,
size_t output)
{
if (IsDone())
{
throw OrthancException(ErrorCode_BadSequenceOfCalls);
}
else if (input >= output ||
input >= that_.operations_.size() ||
output >= that_.operations_.size() ||
input < that_.current_ ||
output < that_.current_)
{
throw OrthancException(ErrorCode_ParameterOutOfRange);
}
else
{
Operation& a = *that_.operations_[input];
Operation& b = *that_.operations_[output];
a.AddNextOperation(b, false /* not unserializing */);
}
}
void SequenceOfOperationsJob::Start()
{
}
JobStepResult SequenceOfOperationsJob::Step(const std::string& jobId)
{
boost::mutex::scoped_lock lock(mutex_);
if (current_ == operations_.size())
{
LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
operationAdded_.timed_wait(lock, trailingTimeout_);
if (current_ == operations_.size())
{
// No operation was added during the trailing timeout: The
// job is over
LOG(INFO) << "The sequence of operations is over";
done_ = true;
for (std::list::iterator it = observers_.begin();
it != observers_.end(); ++it)
{
(*it)->SignalDone(*this);
}
return JobStepResult::Success();
}
else
{
LOG(INFO) << "New operation were added to the sequence of operations";
}
}
assert(current_ < operations_.size());
while (current_ < operations_.size() &&
operations_[current_]->IsDone())
{
current_++;
}
if (current_ < operations_.size())
{
operations_[current_]->Step();
}
return JobStepResult::Continue();
}
void SequenceOfOperationsJob::Reset()
{
boost::mutex::scoped_lock lock(mutex_);
current_ = 0;
done_ = false;
for (size_t i = 0; i < operations_.size(); i++)
{
operations_[i]->Reset();
}
}
void SequenceOfOperationsJob::Stop(JobStopReason reason)
{
}
float SequenceOfOperationsJob::GetProgress() const
{
boost::mutex::scoped_lock lock(mutex_);
return (static_cast(current_) /
static_cast(operations_.size() + 1));
}
void SequenceOfOperationsJob::GetJobType(std::string& target) const
{
target = "SequenceOfOperations";
}
void SequenceOfOperationsJob::GetPublicContent(Json::Value& value) const
{
boost::mutex::scoped_lock lock(mutex_);
value["CountOperations"] = static_cast(operations_.size());
value["Description"] = description_;
}
bool SequenceOfOperationsJob::Serialize(Json::Value& value) const
{
boost::mutex::scoped_lock lock(mutex_);
value = Json::objectValue;
std::string jobType;
GetJobType(jobType);
value[TYPE] = jobType;
value[DESCRIPTION] = description_;
value[TRAILING_TIMEOUT] = static_cast(trailingTimeout_.total_milliseconds());
value[CURRENT] = static_cast(current_);
Json::Value tmp = Json::arrayValue;
for (size_t i = 0; i < operations_.size(); i++)
{
Json::Value operation = Json::objectValue;
operations_[i]->Serialize(operation);
tmp.append(operation);
}
value[OPERATIONS] = tmp;
return true;
}
void SequenceOfOperationsJob::AwakeTrailingSleep()
{
operationAdded_.notify_one();
}
SequenceOfOperationsJob::SequenceOfOperationsJob(IJobUnserializer& unserializer,
const Json::Value& serialized) :
done_(false)
{
std::string jobType;
GetJobType(jobType);
if (SerializationToolbox::ReadString(serialized, TYPE) != jobType ||
!serialized.isMember(OPERATIONS) ||
serialized[OPERATIONS].type() != Json::arrayValue)
{
throw OrthancException(ErrorCode_BadFileFormat);
}
description_ = SerializationToolbox::ReadString(serialized, DESCRIPTION);
trailingTimeout_ = boost::posix_time::milliseconds
(SerializationToolbox::ReadUnsignedInteger(serialized, TRAILING_TIMEOUT));
current_ = SerializationToolbox::ReadUnsignedInteger(serialized, CURRENT);
const Json::Value& ops = serialized[OPERATIONS];
// Unserialize the individual operations
operations_.reserve(ops.size());
for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
{
operations_.push_back(new Operation(unserializer, i, ops[i]));
}
// Connect the next operations
for (Json::Value::ArrayIndex i = 0; i < ops.size(); i++)
{
if (!ops[i].isMember(NEXT_OPERATIONS) ||
ops[i][NEXT_OPERATIONS].type() != Json::arrayValue)
{
throw OrthancException(ErrorCode_BadFileFormat);
}
const Json::Value& next = ops[i][NEXT_OPERATIONS];
for (Json::Value::ArrayIndex j = 0; j < next.size(); j++)
{
if (next[j].type() != Json::intValue ||
next[j].asInt() < 0 ||
next[j].asUInt() >= operations_.size())
{
throw OrthancException(ErrorCode_BadFileFormat);
}
else
{
operations_[i]->AddNextOperation(*operations_[next[j].asUInt()], true);
}
}
}
}
}