Orthanc/OrthancServer/Sources/ServerIndex.cpp
2025-06-23 19:07:37 +05:30

615 lines
18 KiB
C++

/**
* Orthanc - A Lightweight, RESTful DICOM Store
* Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
* Department, University Hospital of Liege, Belgium
* Copyright (C) 2017-2023 Osimis S.A., Belgium
* Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
* Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
*
* This program is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**/
#include "PrecompiledHeadersServer.h"
#include "ServerIndex.h"
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include "../../OrthancFramework/Sources/Logging.h"
#include "../../OrthancFramework/Sources/Toolbox.h"
#include "OrthancConfiguration.h"
#include "ServerContext.h"
#include "ServerIndexChange.h"
#include "ServerToolbox.h"
namespace Orthanc
{
class ServerIndex::TransactionContext : public StatelessDatabaseOperations::ITransactionContext
{
private:
struct FileToRemove
{
private:
std::string uuid_;
std::string customData_;
FileContentType type_;
public:
explicit FileToRemove(const FileInfo& info) :
uuid_(info.GetUuid()),
customData_(info.GetCustomData()),
type_(info.GetContentType())
{
}
const std::string& GetUuid() const
{
return uuid_;
}
const std::string& GetCustomData() const
{
return customData_;
}
FileContentType GetContentType() const
{
return type_;
}
};
ServerContext& context_;
bool hasRemainingLevel_;
ResourceType remainingType_;
std::string remainingPublicId_;
std::list<FileToRemove> pendingFilesToRemove_;
std::list<ServerIndexChange> pendingChanges_;
uint64_t sizeOfFilesToRemove_;
uint64_t sizeOfAddedAttachments_;
void Reset()
{
sizeOfFilesToRemove_ = 0;
hasRemainingLevel_ = false;
remainingType_ = ResourceType_Instance; // dummy initialization
pendingFilesToRemove_.clear();
pendingChanges_.clear();
sizeOfAddedAttachments_ = 0;
}
void CommitFilesToRemove()
{
for (std::list<FileToRemove>::const_iterator
it = pendingFilesToRemove_.begin();
it != pendingFilesToRemove_.end(); ++it)
{
try
{
context_.RemoveFile(it->GetUuid(), it->GetContentType(), it->GetCustomData());
}
catch (OrthancException& e)
{
LOG(ERROR) << "Unable to remove an attachment from the storage area: "
<< it->GetUuid() << " (type: " << EnumerationToString(it->GetContentType()) << ")";
}
}
}
void CommitChanges()
{
for (std::list<ServerIndexChange>::const_iterator
it = pendingChanges_.begin();
it != pendingChanges_.end(); ++it)
{
context_.SignalChange(*it);
}
}
public:
explicit TransactionContext(ServerContext& context) :
context_(context)
{
Reset();
assert(ResourceType_Patient < ResourceType_Study &&
ResourceType_Study < ResourceType_Series &&
ResourceType_Series < ResourceType_Instance);
}
virtual void SignalRemainingAncestor(ResourceType parentType,
const std::string& publicId) ORTHANC_OVERRIDE
{
LOG(TRACE) << "Remaining ancestor \"" << publicId << "\" (" << parentType << ")";
if (hasRemainingLevel_)
{
if (parentType < remainingType_)
{
remainingType_ = parentType;
remainingPublicId_ = publicId;
}
}
else
{
hasRemainingLevel_ = true;
remainingType_ = parentType;
remainingPublicId_ = publicId;
}
}
virtual void SignalAttachmentDeleted(const FileInfo& info) ORTHANC_OVERRIDE
{
assert(Toolbox::IsUuid(info.GetUuid()));
pendingFilesToRemove_.push_back(FileToRemove(info));
sizeOfFilesToRemove_ += info.GetCompressedSize();
}
virtual void SignalResourceDeleted(ResourceType type,
const std::string& publicId) ORTHANC_OVERRIDE
{
SignalChange(ServerIndexChange(ChangeType_Deleted, type, publicId));
}
virtual void SignalChange(const ServerIndexChange& change) ORTHANC_OVERRIDE
{
LOG(TRACE) << "Change related to resource " << change.GetPublicId() << " of type "
<< EnumerationToString(change.GetResourceType()) << ": "
<< EnumerationToString(change.GetChangeType());
pendingChanges_.push_back(change);
}
virtual void SignalAttachmentsAdded(uint64_t compressedSize) ORTHANC_OVERRIDE
{
sizeOfAddedAttachments_ += compressedSize;
}
virtual bool LookupRemainingLevel(std::string& remainingPublicId /* out */,
ResourceType& remainingLevel /* out */) ORTHANC_OVERRIDE
{
if (hasRemainingLevel_)
{
remainingPublicId = remainingPublicId_;
remainingLevel = remainingType_;
return true;
}
else
{
return false;
}
};
virtual void MarkAsUnstable(ResourceType type,
int64_t id,
const std::string& publicId) ORTHANC_OVERRIDE
{
context_.GetIndex().MarkAsUnstable(type, id, publicId);
}
virtual bool IsUnstableResource(ResourceType type,
int64_t id) ORTHANC_OVERRIDE
{
return context_.GetIndex().IsUnstableResource(type, id);
}
virtual void Commit() ORTHANC_OVERRIDE
{
// We can remove the files once the SQLite transaction has
// been successfully committed. Some files might have to be
// deleted because of recycling.
CommitFilesToRemove();
// Send all the pending changes to the Orthanc plugins
CommitChanges();
}
virtual int64_t GetCompressedSizeDelta() ORTHANC_OVERRIDE
{
return (static_cast<int64_t>(sizeOfAddedAttachments_) -
static_cast<int64_t>(sizeOfFilesToRemove_));
}
};
class ServerIndex::TransactionContextFactory : public ITransactionContextFactory
{
private:
ServerContext& context_;
public:
explicit TransactionContextFactory(ServerContext& context) :
context_(context)
{
}
virtual ITransactionContext* Create()
{
// There can be concurrent calls to this method, which is not an
// issue because we simply create an object
return new TransactionContext(context_);
}
};
class ServerIndex::UnstableResourcePayload
{
private:
std::string publicId_;
boost::posix_time::ptime time_;
public:
UnstableResourcePayload()
{
}
explicit UnstableResourcePayload(const std::string& publicId) :
publicId_(publicId),
time_(boost::posix_time::second_clock::local_time())
{
}
unsigned int GetAge() const
{
return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
}
const std::string& GetPublicId() const
{
return publicId_;
}
};
void ServerIndex::FlushThread(ServerIndex* that,
unsigned int threadSleepGranularityMilliseconds)
{
Logging::SetCurrentThreadName("DB-FLUSH");
// By default, wait for 10 seconds before flushing
static const unsigned int SLEEP_SECONDS = 10;
if (threadSleepGranularityMilliseconds > 1000)
{
throw OrthancException(ErrorCode_ParameterOutOfRange);
}
LOG(INFO) << "Starting the database flushing thread (sleep = " << SLEEP_SECONDS << " seconds)";
unsigned int count = 0;
unsigned int countThreshold = (1000 * SLEEP_SECONDS) / threadSleepGranularityMilliseconds;
while (!that->done_)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));
count++;
if (count >= countThreshold)
{
Logging::Flush();
that->FlushToDisk();
count = 0;
}
}
LOG(INFO) << "Stopping the database flushing thread";
}
bool ServerIndex::IsUnstableResource(ResourceType type,
int64_t id)
{
boost::mutex::scoped_lock lock(monitoringMutex_);
return unstableResources_.Contains(std::make_pair(type, id));
}
ServerIndex::ServerIndex(ServerContext& context,
IDatabaseWrapper& db,
unsigned int threadSleepGranularityMilliseconds,
bool readOnly) :
StatelessDatabaseOperations(db, readOnly),
done_(false),
maximumStorageMode_(MaxStorageMode_Recycle),
maximumStorageSize_(0),
maximumPatients_(0),
readOnly_(readOnly)
{
SetTransactionContextFactory(new TransactionContextFactory(context));
// Initial recycling if the parameters have changed since the last
// execution of Orthanc
if (!readOnly)
{
StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_);
}
// For some DB engines (like SQLite), make sure we flush the DB to disk at regular interval
if (GetDatabaseCapabilities().HasFlushToDisk())
{
if (readOnly)
{
LOG(WARNING) << "READ-ONLY SYSTEM: not starting the flush disk thread";
}
else
{
flushThread_ = boost::thread(FlushThread, this, threadSleepGranularityMilliseconds);
}
}
if (readOnly)
{
LOG(WARNING) << "READ-ONLY SYSTEM: not starting the unstable resources monitor thread";
}
else
{
unstableResourcesMonitorThread_ = boost::thread
(UnstableResourcesMonitorThread, this, threadSleepGranularityMilliseconds);
}
}
ServerIndex::~ServerIndex()
{
if (!done_)
{
LOG(ERROR) << "INTERNAL ERROR: ServerIndex::Stop() should be invoked manually to avoid mess in the destruction order!";
Stop();
}
}
void ServerIndex::Stop()
{
if (!done_)
{
done_ = true;
if (flushThread_.joinable())
{
flushThread_.join();
}
if (unstableResourcesMonitorThread_.joinable())
{
unstableResourcesMonitorThread_.join();
}
}
}
void ServerIndex::SetMaximumPatientCount(unsigned int count)
{
{
boost::mutex::scoped_lock lock(monitoringMutex_);
maximumPatients_ = count;
if (count == 0)
{
LOG(WARNING) << "No limit on the number of stored patients";
}
else
{
LOG(WARNING) << "At most " << count << " patients will be stored";
}
}
StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_);
}
void ServerIndex::SetMaximumStorageSize(uint64_t size)
{
{
boost::mutex::scoped_lock lock(monitoringMutex_);
maximumStorageSize_ = size;
if (size == 0)
{
LOG(WARNING) << "No limit on the size of the storage area";
}
else
{
LOG(WARNING) << "At most " << Toolbox::GetHumanFileSize(size) << " will be used for the storage area";
}
}
StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_);
}
void ServerIndex::SetMaximumStorageMode(MaxStorageMode mode)
{
{
boost::mutex::scoped_lock lock(monitoringMutex_);
maximumStorageMode_ = mode;
if (mode == MaxStorageMode_Recycle)
{
if (maximumStorageSize_ > 0 || maximumPatients_ > 0)
{
LOG(WARNING) << "Maximum Storage mode: Recycle";
}
}
else
{
if (maximumStorageSize_ > 0 || maximumPatients_ > 0)
{
LOG(WARNING) << "Maximum Storage mode: Reject";
}
}
}
StandaloneRecycling(maximumStorageMode_, maximumStorageSize_, maximumPatients_);
}
void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that,
unsigned int threadSleepGranularityMilliseconds)
{
Logging::SetCurrentThreadName("UNSTABLE-MON");
int stableAge;
{
OrthancConfiguration::ReaderLock lock;
stableAge = lock.GetConfiguration().GetUnsignedIntegerParameter("StableAge", 60);
}
if (stableAge <= 0)
{
stableAge = 60;
}
LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
while (!that->done_)
{
// Check for stable resources each few seconds
boost::this_thread::sleep(boost::posix_time::milliseconds(threadSleepGranularityMilliseconds));
for (;;)
{
UnstableResourcePayload stablePayload;
ResourceType stableLevel;
int64_t stableId;
{
boost::mutex::scoped_lock lock(that->monitoringMutex_);
if (!that->unstableResources_.IsEmpty() &&
that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
{
// This DICOM resource has not received any new instance for
// some time. It can be considered as stable.
std::pair<ResourceType, int64_t> stableResource = that->unstableResources_.RemoveOldest(stablePayload);
stableLevel = stableResource.first;
stableId = stableResource.second;
//LOG(TRACE) << "Stable resource: " << EnumerationToString(stablePayload.GetResourceType()) << " " << stableId;
}
else
{
// No more stable DICOM resource, leave the internal loop
break;
}
}
try
{
/**
* WARNING: Don't protect the calls to "LogChange()" using
* "monitoringMutex_", as this could lead to deadlocks in
* other threads (typically, if "Store()" is being running in
* another thread, which leads to calls to "MarkAsUnstable()",
* which leads to two lockings of "monitoringMutex_").
**/
switch (stableLevel)
{
case ResourceType_Patient:
that->LogChange(stableId, ChangeType_StablePatient, stablePayload.GetPublicId(), ResourceType_Patient);
break;
case ResourceType_Study:
that->LogChange(stableId, ChangeType_StableStudy, stablePayload.GetPublicId(), ResourceType_Study);
break;
case ResourceType_Series:
that->LogChange(stableId, ChangeType_StableSeries, stablePayload.GetPublicId(), ResourceType_Series);
break;
default:
throw OrthancException(ErrorCode_InternalError);
}
}
catch (OrthancException& e)
{
LOG(ERROR) << "Cannot log a change about a stable resource into the database";
}
}
}
LOG(INFO) << "Closing the monitor thread for stable resources";
}
void ServerIndex::MarkAsUnstable(ResourceType type,
int64_t id,
const std::string& publicId)
{
assert(type == ResourceType_Patient ||
type == ResourceType_Study ||
type == ResourceType_Series);
{
boost::mutex::scoped_lock lock(monitoringMutex_);
UnstableResourcePayload payload(publicId);
unstableResources_.AddOrMakeMostRecent(std::make_pair(type, id), payload);
//LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
}
}
StoreStatus ServerIndex::Store(std::map<MetadataType, std::string>& instanceMetadata,
const DicomMap& dicomSummary,
const ServerIndex::Attachments& attachments,
const ServerIndex::MetadataMap& metadata,
const DicomInstanceOrigin& origin,
bool overwrite,
bool hasTransferSyntax,
DicomTransferSyntax transferSyntax,
bool hasPixelDataOffset,
uint64_t pixelDataOffset,
ValueRepresentation pixelDataVR,
bool isReconstruct)
{
uint64_t maximumStorageSize;
unsigned int maximumPatients;
MaxStorageMode maximumStorageMode;
{
boost::mutex::scoped_lock lock(monitoringMutex_);
maximumStorageSize = maximumStorageSize_;
maximumPatients = maximumPatients_;
maximumStorageMode = maximumStorageMode_;
}
return StatelessDatabaseOperations::Store(
instanceMetadata, dicomSummary, attachments, metadata, origin, overwrite, hasTransferSyntax,
transferSyntax, hasPixelDataOffset, pixelDataOffset, pixelDataVR, maximumStorageMode,
maximumStorageSize, maximumPatients, isReconstruct);
}
StoreStatus ServerIndex::AddAttachment(int64_t& newRevision,
const FileInfo& attachment,
const std::string& publicId,
bool hasOldRevision,
int64_t oldRevision,
const std::string& oldMD5)
{
uint64_t maximumStorageSize;
unsigned int maximumPatients;
{
boost::mutex::scoped_lock lock(monitoringMutex_);
maximumStorageSize = maximumStorageSize_;
maximumPatients = maximumPatients_;
}
return StatelessDatabaseOperations::AddAttachment(
newRevision, attachment, publicId, maximumStorageSize, maximumPatients,
hasOldRevision, oldRevision, oldMD5);
}
}