Skip to content
Merged
47 changes: 41 additions & 6 deletions core/common/TimeUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include <atomic>
#include <chrono>
#ifdef APSARA_UNIT_TEST_MAIN
#include <functional>
#endif
#include <limits>
#if defined(__linux__)
#include <ctime>
Expand All @@ -35,6 +38,14 @@ namespace logtail {

const std::string PRECISE_TIMESTAMP_DEFAULT_KEY = "precise_timestamp";

#ifdef APSARA_UNIT_TEST_MAIN
thread_local std::function<uint64_t()> gCurrentTimeNs = []() -> uint64_t {
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch())
.count());
};
#endif

std::string ConvertToTimeStamp(const time_t& t, const std::string& format) {
return GetTimeStamp(t, format);
}
Expand All @@ -59,14 +70,33 @@ std::string GetTimeStamp(time_t t, const std::string& format, bool isLocal) {
return (0 == ret) ? "" : std::string(buf, ret);
}

uint64_t GetCurrentTimeInSeconds() {
#ifdef APSARA_UNIT_TEST_MAIN
return gCurrentTimeNs() / 1000000000ULL;
#else
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count());
#endif
Comment thread
yyuuttaaoo marked this conversation as resolved.
}

uint64_t GetCurrentTimeInMicroSeconds() {
return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
#ifdef APSARA_UNIT_TEST_MAIN
return gCurrentTimeNs() / 1000ULL;
Comment thread
Takuka0311 marked this conversation as resolved.
#else
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch())
.count());
#endif
}

uint64_t GetCurrentTimeInMilliSeconds() {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
#ifdef APSARA_UNIT_TEST_MAIN
return gCurrentTimeNs() / 1000000ULL;
#else
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count());
#endif
}

int GetLocalTimeZoneOffsetSecond() {
Expand Down Expand Up @@ -365,8 +395,13 @@ uint64_t GetPreciseTimestamp(uint64_t secondTimestamp,
}

uint64_t GetCurrentTimeInNanoSeconds() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
#ifdef APSARA_UNIT_TEST_MAIN
return gCurrentTimeNs();
#else
return static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch())
.count());
#endif
}

bool ParseTimeZoneOffsetSecond(const std::string& logTZ, int& logTZSecond) {
Expand Down
33 changes: 32 additions & 1 deletion core/common/TimeUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
#pragma once
#include <ctime>

#include <chrono>
#include <optional>
#include <string>
#include <thread>

#ifdef APSARA_UNIT_TEST_MAIN
#include <functional>
#endif

#include "common/Strptime.h"
#include "protobuf/sls/sls_logs.pb.h"

Expand All @@ -41,11 +46,37 @@ struct PreciseTimestampConfig {

typedef timespec LogtailTime;

#ifdef APSARA_UNIT_TEST_MAIN
// Injectable clock for deterministic unit tests. Returns nanoseconds since Unix epoch.
// thread_local ensures background threads spawned by pipelines always use the real clock,
// while the test thread can safely override without data races.
extern thread_local std::function<uint64_t()> gCurrentTimeNs;

// RAII guard that temporarily pins the clock to a fixed value for the duration
// of a test and restores the previous value on destruction.
// Safe even when pipelines start background threads — each thread has its own copy.
struct ScopedClockOverride {
explicit ScopedClockOverride(std::chrono::system_clock::time_point fixedNow) : mPrev(gCurrentTimeNs) {
const uint64_t fixedNs = static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(fixedNow.time_since_epoch()).count());
gCurrentTimeNs = [fixedNs]() -> uint64_t { return fixedNs; };
Comment thread
yyuuttaaoo marked this conversation as resolved.
}
~ScopedClockOverride() { gCurrentTimeNs = mPrev; }

ScopedClockOverride(const ScopedClockOverride&) = delete;
ScopedClockOverride& operator=(const ScopedClockOverride&) = delete;

private:
std::function<uint64_t()> mPrev;
};
#endif

// Convert @tm to string according to @format. TODO: Merge ConvertToTimeStamp and GetTimeStamp.
std::string ConvertToTimeStamp(const time_t& tm, const std::string& format = "%Y%m%d%H%M%S");
std::string GetTimeStamp(time_t tm, const std::string& format = "%Y%m%d%H%M%S", bool isLocal = true);

// Get current time in us or ms.
// Get current time in s, us, ms, or ns.
uint64_t GetCurrentTimeInSeconds();
uint64_t GetCurrentTimeInMicroSeconds();
uint64_t GetCurrentTimeInMilliSeconds();
uint64_t GetCurrentTimeInNanoSeconds();
Expand Down
5 changes: 3 additions & 2 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "config/PipelineConfig.h"

#include "common/JsonUtil.h"
#include "common/TimeUtil.h"
#include "config/OnetimeConfigInfoManager.h"
#include "logger/Logger.h"

Expand Down Expand Up @@ -96,12 +97,12 @@ bool PipelineConfig::GetExpireTimeIfOneTime(const Json::Value& global) {
return true;
case OnetimeConfigStatus::NEW:
// NEW状态表示是新配置,或已有配置Rerun了
mOnetimeStartTime = time(nullptr);
mOnetimeStartTime = static_cast<uint32_t>(GetCurrentTimeInSeconds());
mOnetimeExpireTime = mOnetimeStartTime.value() + mExcutionTimeout;
return true;
case OnetimeConfigStatus::UPDATED:
// UPDATED状态表示配置hash改变但input hash未变,保持原有checkpoint,但是更新过期时间
mOnetimeStartTime = time(nullptr);
mOnetimeStartTime = static_cast<uint32_t>(GetCurrentTimeInSeconds());
mOnetimeExpireTime = mOnetimeStartTime.value() + mExcutionTimeout;
mIsRunningBeforeStart = true;
LOG_INFO(sLogger,
Expand Down
37 changes: 31 additions & 6 deletions core/unittest/config/OnetimeConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "collection_pipeline/CollectionPipelineManager.h"
#include "common/JsonUtil.h"
#include "common/TimeUtil.h"
#include "config/OnetimeConfigInfoManager.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "unittest/Unittest.h"
Expand Down Expand Up @@ -238,20 +239,32 @@ void OnetimeConfigUpdateUnittest::OnCollectionConfigUpdate() const {

auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_TRUE(diff.first.HasDiff());
CollectionPipelineManager::GetInstance()->UpdatePipelines(diff.first);
auto restartNow = std::chrono::system_clock::now();
{
ScopedClockOverride clockGuard(restartNow);
CollectionPipelineManager::GetInstance()->UpdatePipelines(diff.first);
}
sConfigManager->DumpCheckpointFile();

APSARA_TEST_EQUAL(3U, sConfigManager->mConfigInfoMap.size());
{
const auto& item = sConfigManager->mConfigInfoMap.at("new_config");
APSARA_TEST_EQUAL(time(nullptr) + 3600U, item.mExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(restartNow.time_since_epoch()).count())
+ 3600U,
item.mExpireTime);
APSARA_TEST_EQUAL(configHash["new_config.json"], item.mConfigHash);
APSARA_TEST_EQUAL(ConfigType::Collection, item.mType);
APSARA_TEST_EQUAL(mConfigDir / filenames[0], item.mFilepath);
}
{
const auto& item = sConfigManager->mConfigInfoMap.at("changed_config");
APSARA_TEST_EQUAL(time(nullptr) + 7200U, item.mExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(restartNow.time_since_epoch()).count())
+ 7200U,
item.mExpireTime);
APSARA_TEST_EQUAL(configHash["changed_config.json"], item.mConfigHash);
APSARA_TEST_EQUAL(ConfigType::Collection, item.mType);
APSARA_TEST_EQUAL(mConfigDir / filenames[1], item.mFilepath);
Expand Down Expand Up @@ -333,20 +346,32 @@ void OnetimeConfigUpdateUnittest::OnCollectionConfigUpdate() const {

auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_TRUE(diff.first.HasDiff());
CollectionPipelineManager::GetInstance()->UpdatePipelines(diff.first);
auto updateNow = std::chrono::system_clock::now();
{
ScopedClockOverride clockGuard(updateNow);
CollectionPipelineManager::GetInstance()->UpdatePipelines(diff.first);
}
sConfigManager->DumpCheckpointFile();

APSARA_TEST_EQUAL(3U, sConfigManager->mConfigInfoMap.size());
{
const auto& item = sConfigManager->mConfigInfoMap.at("new_config");
APSARA_TEST_EQUAL(time(nullptr) + 1000U, item.mExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(updateNow.time_since_epoch()).count())
+ 1000U,
item.mExpireTime);
APSARA_TEST_EQUAL(configHash["new_config.json"], item.mConfigHash);
APSARA_TEST_EQUAL(ConfigType::Collection, item.mType);
APSARA_TEST_EQUAL(mConfigDir / filenames[0], item.mFilepath);
}
{
const auto& item = sConfigManager->mConfigInfoMap.at("old_config");
APSARA_TEST_EQUAL(time(nullptr) + 1200U, item.mExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(updateNow.time_since_epoch()).count())
+ 1200U,
item.mExpireTime);
APSARA_TEST_EQUAL(configHash["old_config.json"], item.mConfigHash);
APSARA_TEST_EQUAL(ConfigType::Collection, item.mType);
APSARA_TEST_EQUAL(mConfigDir / filenames[1], item.mFilepath);
Expand Down
29 changes: 25 additions & 4 deletions core/unittest/config/PipelineConfigUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/TimeUtil.h"
#include "config/OnetimeConfigInfoManager.h"
#include "config/PipelineConfig.h"
#include "logger/Logger.h"
Expand Down Expand Up @@ -71,29 +72,44 @@ void PipelineConfigUnittest::TestOnetimeConfig() const {
auto configJson = make_unique<Json::Value>();
(*configJson)["global"]["ExcutionTimeout"] = true;

auto now = std::chrono::system_clock::now();
ScopedClockOverride clockGuard(now);
ConfigMock config("test", std::move(configJson), filepath);
APSARA_TEST_TRUE(config.GetExpireTimeIfOneTime((*config.mDetail)["global"]));
APSARA_TEST_EQUAL(time(nullptr) + 604800U, config.mOnetimeExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count())
+ 604800U,
config.mOnetimeExpireTime);
Comment thread
yyuuttaaoo marked this conversation as resolved.
APSARA_TEST_FALSE(config.mIsRunningBeforeStart);
}
{
// ExcutionTimeout too small
auto configJson = make_unique<Json::Value>();
(*configJson)["global"]["ExcutionTimeout"] = 1U;

auto now = std::chrono::system_clock::now();
ScopedClockOverride clockGuard(now);
ConfigMock config("test", std::move(configJson), filepath);
APSARA_TEST_TRUE(config.GetExpireTimeIfOneTime((*config.mDetail)["global"]));
APSARA_TEST_EQUAL(time(nullptr) + 600U, config.mOnetimeExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count())
+ 600U,
config.mOnetimeExpireTime);
APSARA_TEST_FALSE(config.mIsRunningBeforeStart);
}
{
// ExcutionTimeout too large
auto configJson = make_unique<Json::Value>();
(*configJson)["global"]["ExcutionTimeout"] = 1000000U;

auto now = std::chrono::system_clock::now();
ScopedClockOverride clockGuard(now);
ConfigMock config("test", std::move(configJson), filepath);
APSARA_TEST_TRUE(config.GetExpireTimeIfOneTime((*config.mDetail)["global"]));
APSARA_TEST_EQUAL(time(nullptr) + 604800U, config.mOnetimeExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count())
+ 604800U,
config.mOnetimeExpireTime);
APSARA_TEST_FALSE(config.mIsRunningBeforeStart);
}

Expand Down Expand Up @@ -142,9 +158,14 @@ void PipelineConfigUnittest::TestOnetimeConfig() const {
auto configJson = make_unique<Json::Value>();
(*configJson)["global"]["ExcutionTimeout"] = 3600U;

auto now = std::chrono::system_clock::now();
ScopedClockOverride clockGuard(now);
ConfigMock config("new_config", std::move(configJson), filepath);
APSARA_TEST_TRUE(config.GetExpireTimeIfOneTime((*config.mDetail)["global"]));
APSARA_TEST_EQUAL(time(nullptr) + 3600U, config.mOnetimeExpireTime);
APSARA_TEST_EQUAL(
static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count())
+ 3600U,
config.mOnetimeExpireTime);
APSARA_TEST_FALSE(config.mIsRunningBeforeStart);
APSARA_TEST_EQUAL(sConfigManager->mConfigCheckpointMap.end(),
sConfigManager->mConfigCheckpointMap.find("new_config"));
Expand Down
Loading