Skip to main content

chat1

import polars as pl
from datetime import timedelta

def flag_offline_periods(propagate_indicator(df):
    # CountEnsure entriesthe perdataframe bucketis sorted by timestamp
    countsdf = df.groupby(sort('timestamp_bucket'timestamp').agg(pl.count('*').alias('count'))

  
    # Identify buckets with more than 20 entries
    high_count_buckets = counts.filter(pl.col('count') > 20)['timestamp_bucket']
    

    # Create a functionhelper column to generatetrack offlinewhere periodsindicator is 1
    defdf generate_offline_periods(high_count_buckets):= df.with_column(
        offline_periods(pl.col('indicator') = []
        for bucket in high_count_buckets:
            offline_end = bucket + pl.duration(seconds=30)
            offline_periods.append((bucket, offline_end)1).alias('start_indicator')
    return offline_periods)

    # GenerateCalculate offlinethe periodstimestamp up to which the value 1 should be propagated
    offline_periodsdf = generate_offline_periods(high_count_buckets)df.with_column(
        pl.when(pl.col('start_indicator'))
        .then(pl.col('timestamp') + timedelta(seconds=30))
        .otherwise(None)
        .alias('propagate_until')
    )

    # MergeForward overlappingfill periodsthe 'propagate_until' to get the maximum propagate time
    merged_periodsdf = []df.with_column(
        for start, end in sorted(offline_periods):
        if merged_periods and start <= merged_periods[-1][1]:
            merged_periods[-1] = (merged_periods[-1][0], max(merged_periods[-1][1], end)pl.col('propagate_until').fill_null('forward').alias('propagation_time')
    else:
)

          merged_periods.append((start, end))
    

    # CreateUse a window function to check if a timestamp is within offline periods
    def is_offline(timestamp):
        return any(start <= timestamp < end for start, end in merged_periods)
    
    # Applypropagate the offline check to each rowindicator
    df = df.with_columns(
        pl.when(pl.col('timestamp_bucket'timestamp').apply(is_offline) <= pl.col('propagation_time'))
        .then(pl.lit('OFFLINE'))1)
        .otherwise(pl.lit(col('ONLINE'indicator'))
        .alias('status'indicator_propagated')
    )

    # Drop helper columns
   
df = df.drop(['start_indicator', 'propagate_until', 'propagation_time'])

    return df

# Assuming yourExample DataFrame
data is= called{
    'df'timestamp': [pl.datetime("2023-01-01 00:00:00") + timedelta(seconds=i) for i in range(120)],
    'indicator': [1 if i in [0, 50, 70, 100] else None for i in range(120)]
}
df = flag_offline_periods(pl.DataFrame(data)

# Apply the function
result_df = propagate_indicator(df)
print(result_df)