Skip to content

Python API

This package is primarily a CLI. The core modules are documented below for advanced users.

Config

Config dataclass

Manages configuration for the trading bot, loading from a YAML file and environment variables.

Source code in src/cc_liquid/config.py
@dataclass
class Config:
    """
    Manages configuration for the trading bot, loading from a YAML file
    and environment variables.
    """

    # Private Keys and API Credentials (from .env)
    CROWDCENT_API_KEY: str | None = None
    HYPERLIQUID_PRIVATE_KEY: str | None = (
        None  # Private key for signing (owner or approved agent wallet)
    )

    # Environment
    is_testnet: bool = False
    base_url: str = "https://api.hyperliquid.xyz"

    # Profiles (addresses in config; secrets remain in env)
    active_profile: str | None = "default"
    profiles: dict[str, Any] = field(default_factory=dict)

    # Resolved addresses from active profile (owner/vault)
    HYPERLIQUID_ADDRESS: str | None = None
    HYPERLIQUID_VAULT_ADDRESS: str | None = None

    # Nested Configs
    data: DataSourceConfig = field(default_factory=DataSourceConfig)
    portfolio: PortfolioConfig = field(default_factory=PortfolioConfig)
    execution: ExecutionConfig = field(default_factory=ExecutionConfig)

    def __post_init__(self):
        """Load environment variables and YAML config after initialization."""
        self._load_env_vars()
        self._load_yaml_config()
        self._resolve_profile()  # Must come AFTER loading YAML (which loads profiles)
        self._set_base_url()
        self._validate()

    def _load_env_vars(self):
        """Load secrets-only from .env; addresses come from YAML/profiles."""
        load_dotenv()
        self.CROWDCENT_API_KEY = os.getenv("CROWDCENT_API_KEY")
        # Don't load private key here - will be resolved based on profile's signer_env

    def _load_yaml_config(self, config_path: str | None = None):
        """Loads and overrides config from a YAML file."""
        path = config_path or DEFAULT_CONFIG_PATH
        if os.path.exists(path):
            with open(path) as f:
                yaml_config: dict[str, Any] = yaml.safe_load(f) or {}

            for key, value in yaml_config.items():
                if hasattr(self, key):
                    # Handle nested dataclasses
                    if isinstance(value, dict) and is_dataclass(getattr(self, key)):
                        nested_config_obj = getattr(self, key)
                        for nested_key, nested_value in value.items():
                            if hasattr(nested_config_obj, nested_key):
                                # Handle double nested dataclasses
                                if isinstance(nested_value, dict) and is_dataclass(
                                    getattr(nested_config_obj, nested_key)
                                ):
                                    nested_dataclass = getattr(
                                        nested_config_obj, nested_key
                                    )
                                    for deep_key, deep_value in nested_value.items():
                                        if hasattr(nested_dataclass, deep_key):
                                            setattr(
                                                nested_dataclass, deep_key, deep_value
                                            )
                                else:
                                    setattr(nested_config_obj, nested_key, nested_value)
                    else:
                        # Direct assignment for non-dataclass fields (like profiles dict)
                        setattr(self, key, value)

    def _set_base_url(self):
        """Sets the base URL based on the is_testnet flag."""
        if self.is_testnet:
            self.base_url = "https://api.hyperliquid-testnet.xyz"

    def _resolve_profile(self):
        """Resolve owner/vault addresses and signer key from the active profile."""
        # If no profiles defined, skip resolution
        if not self.profiles:
            return

        active = self.active_profile or "default"
        profile = self.profiles.get(active, {})

        # Extract owner and vault from profile
        owner = profile.get("owner")
        vault = profile.get("vault")

        # Set addresses (owner is required, vault is optional)
        self.HYPERLIQUID_ADDRESS = owner
        self.HYPERLIQUID_VAULT_ADDRESS = vault

        # Resolve signer key from environment based on profile's signer_env
        signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        self.HYPERLIQUID_PRIVATE_KEY = os.getenv(signer_env)

        # Fallback to default env var if custom signer_env not found
        if not self.HYPERLIQUID_PRIVATE_KEY and signer_env != "HYPERLIQUID_PRIVATE_KEY":
            self.HYPERLIQUID_PRIVATE_KEY = os.getenv("HYPERLIQUID_PRIVATE_KEY")

    def refresh_runtime(self):
        """Refresh runtime configuration after changes (e.g., CLI overrides)."""
        self._set_base_url()
        self._resolve_profile()
        self._validate()

    def _validate(self):
        """Validate that required configuration is present.

        Note: This is lenient during initial module load to allow CLI commands
        like 'profile list' to work even without complete setup.
        """
        # Check if active profile exists
        if self.profiles and self.active_profile:
            if self.active_profile not in self.profiles:
                available = ", ".join(sorted(self.profiles.keys()))
                raise ValueError(
                    f"Active profile '{self.active_profile}' not found. Available profiles: {available}"
                )

        # Don't validate addresses/keys during module import - let individual commands handle it
        # This allows 'profile list', 'config', etc to work without full setup

    def validate_for_trading(self):
        """Strict validation for trading operations.

        Call this before any trading operations to ensure all required config is present.
        """
        # Validate we have required addresses from profile
        if not self.HYPERLIQUID_ADDRESS and not self.HYPERLIQUID_VAULT_ADDRESS:
            raise ValueError(
                "Profile must specify 'owner' or 'vault' address in cc-liquid-config.yaml"
            )

        # Validate we have a private key
        if not self.HYPERLIQUID_PRIVATE_KEY:
            # Better error message showing which env var is expected
            signer_env = "HYPERLIQUID_PRIVATE_KEY"
            if self.profiles and self.active_profile:
                profile = self.profiles.get(self.active_profile, {})
                signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
            raise ValueError(
                f"Private key not found. Set '{signer_env}' in your .env file."
            )

        # Validate order type
        if self.execution.order_type not in ("market", "limit"):
            raise ValueError(
                f"Invalid order_type: {self.execution.order_type}. Must be 'market' or 'limit'"
            )

        # Validate time in force
        if self.execution.time_in_force not in ("Ioc", "Gtc", "Alo"):
            raise ValueError(
                f"Invalid time_in_force: {self.execution.time_in_force}. Must be 'Ioc', 'Gtc', or 'Alo'"
            )

    def to_dict(self) -> dict[str, Any]:
        """Return a dictionary representation of the config."""
        portfolio_dict = self.portfolio.__dict__.copy()
        # Convert nested dataclasses to dict
        if hasattr(self.portfolio, "rebalancing"):
            portfolio_dict["rebalancing"] = self.portfolio.rebalancing.__dict__
        if hasattr(self.portfolio, "stop_loss"):
            portfolio_dict["stop_loss"] = self.portfolio.stop_loss.__dict__

        # Profile summary (non-secret)
        active_profile = self.active_profile
        prof = self.profiles.get(active_profile) if self.profiles else {}
        signer_env_name = (
            prof.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
            if prof
            else "HYPERLIQUID_PRIVATE_KEY"
        )
        profile_dict = {
            "active": active_profile,
            "owner": self.HYPERLIQUID_ADDRESS,
            "vault": self.HYPERLIQUID_VAULT_ADDRESS,
            "signer_env": signer_env_name,
        }

        return {
            "is_testnet": self.is_testnet,
            "profile": profile_dict,
            "data": self.data.__dict__,
            "portfolio": portfolio_dict,
            "execution": self.execution.__dict__,
        }

__post_init__()

Load environment variables and YAML config after initialization.

Source code in src/cc_liquid/config.py
def __post_init__(self):
    """Load environment variables and YAML config after initialization."""
    self._load_env_vars()
    self._load_yaml_config()
    self._resolve_profile()  # Must come AFTER loading YAML (which loads profiles)
    self._set_base_url()
    self._validate()

refresh_runtime()

Refresh runtime configuration after changes (e.g., CLI overrides).

Source code in src/cc_liquid/config.py
def refresh_runtime(self):
    """Refresh runtime configuration after changes (e.g., CLI overrides)."""
    self._set_base_url()
    self._resolve_profile()
    self._validate()

to_dict()

Return a dictionary representation of the config.

Source code in src/cc_liquid/config.py
def to_dict(self) -> dict[str, Any]:
    """Return a dictionary representation of the config."""
    portfolio_dict = self.portfolio.__dict__.copy()
    # Convert nested dataclasses to dict
    if hasattr(self.portfolio, "rebalancing"):
        portfolio_dict["rebalancing"] = self.portfolio.rebalancing.__dict__
    if hasattr(self.portfolio, "stop_loss"):
        portfolio_dict["stop_loss"] = self.portfolio.stop_loss.__dict__

    # Profile summary (non-secret)
    active_profile = self.active_profile
    prof = self.profiles.get(active_profile) if self.profiles else {}
    signer_env_name = (
        prof.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        if prof
        else "HYPERLIQUID_PRIVATE_KEY"
    )
    profile_dict = {
        "active": active_profile,
        "owner": self.HYPERLIQUID_ADDRESS,
        "vault": self.HYPERLIQUID_VAULT_ADDRESS,
        "signer_env": signer_env_name,
    }

    return {
        "is_testnet": self.is_testnet,
        "profile": profile_dict,
        "data": self.data.__dict__,
        "portfolio": portfolio_dict,
        "execution": self.execution.__dict__,
    }

validate_for_trading()

Strict validation for trading operations.

Call this before any trading operations to ensure all required config is present.

Source code in src/cc_liquid/config.py
def validate_for_trading(self):
    """Strict validation for trading operations.

    Call this before any trading operations to ensure all required config is present.
    """
    # Validate we have required addresses from profile
    if not self.HYPERLIQUID_ADDRESS and not self.HYPERLIQUID_VAULT_ADDRESS:
        raise ValueError(
            "Profile must specify 'owner' or 'vault' address in cc-liquid-config.yaml"
        )

    # Validate we have a private key
    if not self.HYPERLIQUID_PRIVATE_KEY:
        # Better error message showing which env var is expected
        signer_env = "HYPERLIQUID_PRIVATE_KEY"
        if self.profiles and self.active_profile:
            profile = self.profiles.get(self.active_profile, {})
            signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        raise ValueError(
            f"Private key not found. Set '{signer_env}' in your .env file."
        )

    # Validate order type
    if self.execution.order_type not in ("market", "limit"):
        raise ValueError(
            f"Invalid order_type: {self.execution.order_type}. Must be 'market' or 'limit'"
        )

    # Validate time in force
    if self.execution.time_in_force not in ("Ioc", "Gtc", "Alo"):
        raise ValueError(
            f"Invalid time_in_force: {self.execution.time_in_force}. Must be 'Ioc', 'Gtc', or 'Alo'"
        )

Data loading

DataLoader

Factory for creating data sources.

Source code in src/cc_liquid/data_loader.py
class DataLoader:
    """Factory for creating data sources."""

    @staticmethod
    def from_file(path: str, date_col: str, id_col: str, pred_col: str) -> pl.DataFrame:
        """Create a file data source and load data."""
        return FileDataSource(
            path,
            date_column=date_col,
            asset_id_column=id_col,
            prediction_column=pred_col,
        ).load()

    @staticmethod
    def from_dataframe(
        df: pl.DataFrame, date_col: str, id_col: str, pred_col: str
    ) -> pl.DataFrame:
        """Create a DataFrame data source and load data."""
        return DataFrameDataSource(
            df,
            date_column=date_col,
            asset_id_column=id_col,
            prediction_column=pred_col,
        ).load()

    @staticmethod
    def from_crowdcent_api(
        api_key: str | None = None,
        challenge_slug: str = "hyperliquid-ranking",
        download_path: str | None = None,
        date_col: str = "release_date",
        id_col: str = "id",
        pred_col: str = "pred_10d",
    ) -> pl.DataFrame:
        """
        Download and load the CrowdCent meta model.

        Args:
            api_key: CrowdCent API key (if None, will try to load from env)
            challenge_slug: The challenge to download data for
            download_path: Optional path to save the downloaded file
            date_col: Date column name in the meta model
            id_col: Asset ID column name in the meta model
            pred_col: Prediction column name to use from the meta model

        Returns:
            Polars DataFrame with original column names
        """
        from crowdcent_challenge import ChallengeClient

        if api_key is None:
            import os

            api_key = os.getenv("CROWDCENT_API_KEY")
            if not api_key:
                raise ValueError("CROWDCENT_API_KEY not found in environment variables")

        client = ChallengeClient(challenge_slug=challenge_slug, api_key=api_key)

        if download_path is None:
            download_path = "predictions.parquet"

        client.download_meta_model(download_path)

        return DataLoader.from_file(
            path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
        )

    @staticmethod
    def from_numerai_api(
        download_path: str | None = None,
        date_col: str = "date",
        id_col: str = "symbol",
        pred_col: str = "meta_model",
    ) -> pl.DataFrame:
        """
        Download and load the Numerai crypto meta model.

        Args:
            download_path: Optional path to save the downloaded file
            date_col: Date column name in the meta model
            id_col: Asset ID/symbol column name in the meta model
            pred_col: Prediction column name to use from the meta model

        Returns:
            Polars DataFrame with original column names
        """
        try:
            from numerapi import CryptoAPI
        except ImportError:
            raise ImportError(
                "numerapi is required. Install with: uv add cc-liquid[numerai]"
            )

        api = CryptoAPI()

        if download_path is None:
            download_path = "predictions.parquet"

        api.download_dataset("v1.0/historical_meta_models.parquet", download_path)

        return DataLoader.from_file(
            path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
        )

from_crowdcent_api(api_key=None, challenge_slug='hyperliquid-ranking', download_path=None, date_col='release_date', id_col='id', pred_col='pred_10d') staticmethod

Download and load the CrowdCent meta model.

Parameters:

Name Type Description Default
api_key str | None

CrowdCent API key (if None, will try to load from env)

None
challenge_slug str

The challenge to download data for

'hyperliquid-ranking'
download_path str | None

Optional path to save the downloaded file

None
date_col str

Date column name in the meta model

'release_date'
id_col str

Asset ID column name in the meta model

'id'
pred_col str

Prediction column name to use from the meta model

'pred_10d'

Returns:

Type Description
DataFrame

Polars DataFrame with original column names

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_crowdcent_api(
    api_key: str | None = None,
    challenge_slug: str = "hyperliquid-ranking",
    download_path: str | None = None,
    date_col: str = "release_date",
    id_col: str = "id",
    pred_col: str = "pred_10d",
) -> pl.DataFrame:
    """
    Download and load the CrowdCent meta model.

    Args:
        api_key: CrowdCent API key (if None, will try to load from env)
        challenge_slug: The challenge to download data for
        download_path: Optional path to save the downloaded file
        date_col: Date column name in the meta model
        id_col: Asset ID column name in the meta model
        pred_col: Prediction column name to use from the meta model

    Returns:
        Polars DataFrame with original column names
    """
    from crowdcent_challenge import ChallengeClient

    if api_key is None:
        import os

        api_key = os.getenv("CROWDCENT_API_KEY")
        if not api_key:
            raise ValueError("CROWDCENT_API_KEY not found in environment variables")

    client = ChallengeClient(challenge_slug=challenge_slug, api_key=api_key)

    if download_path is None:
        download_path = "predictions.parquet"

    client.download_meta_model(download_path)

    return DataLoader.from_file(
        path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
    )

from_dataframe(df, date_col, id_col, pred_col) staticmethod

Create a DataFrame data source and load data.

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_dataframe(
    df: pl.DataFrame, date_col: str, id_col: str, pred_col: str
) -> pl.DataFrame:
    """Create a DataFrame data source and load data."""
    return DataFrameDataSource(
        df,
        date_column=date_col,
        asset_id_column=id_col,
        prediction_column=pred_col,
    ).load()

from_file(path, date_col, id_col, pred_col) staticmethod

Create a file data source and load data.

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_file(path: str, date_col: str, id_col: str, pred_col: str) -> pl.DataFrame:
    """Create a file data source and load data."""
    return FileDataSource(
        path,
        date_column=date_col,
        asset_id_column=id_col,
        prediction_column=pred_col,
    ).load()

from_numerai_api(download_path=None, date_col='date', id_col='symbol', pred_col='meta_model') staticmethod

Download and load the Numerai crypto meta model.

Parameters:

Name Type Description Default
download_path str | None

Optional path to save the downloaded file

None
date_col str

Date column name in the meta model

'date'
id_col str

Asset ID/symbol column name in the meta model

'symbol'
pred_col str

Prediction column name to use from the meta model

'meta_model'

Returns:

Type Description
DataFrame

Polars DataFrame with original column names

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_numerai_api(
    download_path: str | None = None,
    date_col: str = "date",
    id_col: str = "symbol",
    pred_col: str = "meta_model",
) -> pl.DataFrame:
    """
    Download and load the Numerai crypto meta model.

    Args:
        download_path: Optional path to save the downloaded file
        date_col: Date column name in the meta model
        id_col: Asset ID/symbol column name in the meta model
        pred_col: Prediction column name to use from the meta model

    Returns:
        Polars DataFrame with original column names
    """
    try:
        from numerapi import CryptoAPI
    except ImportError:
        raise ImportError(
            "numerapi is required. Install with: uv add cc-liquid[numerai]"
        )

    api = CryptoAPI()

    if download_path is None:
        download_path = "predictions.parquet"

    api.download_dataset("v1.0/historical_meta_models.parquet", download_path)

    return DataLoader.from_file(
        path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
    )

Trading logic

CCLiquid

Handles all interactions with the Hyperliquid exchange.

Source code in src/cc_liquid/trader.py
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
class CCLiquid:
    """
    Handles all interactions with the Hyperliquid exchange.
    """

    def __init__(
        self,
        config: Config,
        callbacks: CCLiquidCallbacks | None = None,
        skip_ws: bool = True,
    ):
        self.config = config
        self.callbacks = callbacks or NoOpCallbacks()

        # Validate config for trading operations
        self.config.validate_for_trading()

        self.account: LocalAccount = self._get_account()
        self.exchange = Exchange(
            self.account,
            self.config.base_url,
            vault_address=(self.config.HYPERLIQUID_VAULT_ADDRESS or None),
            account_address=self.config.HYPERLIQUID_ADDRESS,
        )
        self.info = Info(self.config.base_url, skip_ws=skip_ws)
        self.logger = logging.getLogger(__name__)
        # Lazy-loaded map of coin -> szDecimals from Info.meta()["universe"].
        # Perps only: Hyperliquid perps use max 6 decimals for price rules.
        self._coin_to_sz_decimals: dict[str, int] | None = None

    def _get_account(self) -> LocalAccount:
        """Creates an eth_account LocalAccount object from the private key."""
        from eth_account import Account

        return Account.from_key(self.config.HYPERLIQUID_PRIVATE_KEY)

    def get_user_state(self) -> dict[str, Any]:
        """Retrieves the current state of the user's account."""
        # Always query Info using the portfolio owner: vault (if set) or master address.
        # Never use the agent/signer address for Info, as it has no balances.
        owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
        if not owner:
            raise ValueError(
                "Missing portfolio owner. Set HYPERLIQUID_VAULT_ADDRESS or HYPERLIQUID_ADDRESS."
            )
        return self.info.user_state(owner)

    def get_positions(self) -> dict[str, Any]:
        """Retrieves the user's open positions as a dict."""
        user_state = self.get_user_state()
        positions = {}
        for position_data in user_state.get("assetPositions", []):
            position = position_data.get("position", {})
            if float(position.get("szi", 0)) != 0:
                positions[position["coin"]] = position
        return positions

    def get_account_value(self) -> float:
        """Retrieves the total account value in USD."""
        user_state = self.get_user_state()
        return float(user_state["marginSummary"]["accountValue"])

    def get_portfolio_info(self) -> PortfolioInfo:
        """Get complete portfolio information as structured data."""
        try:
            user_state = self.get_user_state()
        except Exception as e:
            self.logger.warning(f"Could not get user state: {e}")
            # Return empty portfolio if we can't connect
            return PortfolioInfo(
                account=AccountInfo(
                    account_value=0,
                    total_position_value=0,
                    margin_used=0,
                    free_collateral=0,
                    cash_balance=0,
                    withdrawable=0,
                    current_leverage=0,
                ),
                positions=[],
            )

        margin_summary = user_state.get("marginSummary", {}) if user_state else {}
        all_mids = self.info.all_mids() if user_state else {}

        # Build account info
        account_info = AccountInfo(
            account_value=float(margin_summary.get("accountValue", 0)),
            total_position_value=float(margin_summary.get("totalNtlPos", 0)),
            margin_used=float(margin_summary.get("totalMarginUsed", 0)),
            free_collateral=float(margin_summary.get("accountValue", 0))
            - float(margin_summary.get("totalMarginUsed", 0)),
            cash_balance=float(margin_summary.get("totalRawUsd", 0)),
            withdrawable=float(user_state.get("withdrawable", 0)),
            current_leverage=float(margin_summary.get("totalNtlPos", 0))
            / float(margin_summary.get("accountValue", 1))
            if float(margin_summary.get("accountValue", 0)) > 0
            else 0,
            raw_margin_summary=margin_summary,
        )

        # Add cross margin info if available
        cross_margin = user_state.get("crossMarginSummary")
        if cross_margin:
            account_info.cross_leverage = (
                float(cross_margin.get("accountValue", 0))
                / float(margin_summary.get("accountValue", 1))
                if float(margin_summary.get("accountValue", 0)) > 0
                else 0
            )
            account_info.cross_margin_used = float(
                cross_margin.get("totalMarginUsed", 0)
            )
            account_info.cross_maintenance_margin = float(
                cross_margin.get("totalMaintenanceMargin", 0)
            )
            account_info.raw_cross_margin_summary = cross_margin

        # Build positions list
        positions = []
        for position_data in user_state.get("assetPositions", []):
            pos = position_data.get("position", {})
            size = float(pos.get("szi", 0))

            if size == 0:
                continue

            coin = pos["coin"]
            entry_px = float(pos.get("entryPx", 0))
            mark_px = float(all_mids.get(coin, entry_px))
            position_value = abs(size * mark_px)

            # Calculate unrealized PnL
            if size > 0:
                unrealized_pnl = (mark_px - entry_px) * size
                side = "LONG"
            else:
                unrealized_pnl = (entry_px - mark_px) * abs(size)
                side = "SHORT"

            return_pct = (
                (unrealized_pnl / (abs(size) * entry_px) * 100) if entry_px > 0 else 0
            )

            positions.append(
                Position(
                    coin=coin,
                    side=side,
                    size=abs(size),
                    entry_price=entry_px,
                    mark_price=mark_px,
                    value=position_value,
                    unrealized_pnl=unrealized_pnl,
                    return_pct=return_pct,
                    liquidation_price=float(pos["liquidationPx"])
                    if "liquidationPx" in pos and pos["liquidationPx"] is not None
                    else None,
                    margin_used=float(pos["marginUsed"])
                    if "marginUsed" in pos and pos["marginUsed"] is not None
                    else None,
                )
            )

        return PortfolioInfo(account=account_info, positions=positions)

    # --- Rounding helpers (Perps only) ---
    def _load_sz_decimals_map(self, force_refresh: bool = False) -> dict[str, int]:
        """Load and cache coin -> szDecimals from exchange meta.

        Per Hyperliquid rounding guidance for orders, sizes must be rounded to a
        coin-specific number of decimals (szDecimals). We cache from `info.meta()`
        and refresh on demand.
        """
        if self._coin_to_sz_decimals is None or force_refresh:
            try:
                universe = self.info.meta().get("universe", [])
                self._coin_to_sz_decimals = {
                    asset.get("name"): int(asset.get("szDecimals", 2))
                    for asset in universe
                    if asset.get("name") and not asset.get("isDelisted", False)
                }
            except Exception as e:
                self.logger.warning(f"Failed to load szDecimals map: {e}")
                self._coin_to_sz_decimals = {}
        return self._coin_to_sz_decimals

    def _get_sz_decimals(self, coin: str) -> int | None:
        """Return szDecimals for the given coin, refreshing meta once if needed."""
        sz_map = self._load_sz_decimals_map()
        if coin not in sz_map:
            sz_map = self._load_sz_decimals_map(force_refresh=True)
        return sz_map.get(coin)

    def _round_size(self, coin: str, size: float) -> tuple[float, int] | None:
        """Round size per coin's szDecimals.

        Returns (rounded_size, sz_decimals) or None if szDecimals are unknown.
        """
        sz_decimals = self._get_sz_decimals(coin)
        if sz_decimals is None:
            return None
        return round(size, sz_decimals), sz_decimals

    def _round_price_perp(self, coin: str, px: float) -> float:
        """Round price according to Hyperliquid perp rules (used for limit orders).

        Rules (per Hyperliquid):
        - If px > 100_000: round to integer.
        - Else: round to 5 significant figures and at most (6 - szDecimals) decimals.
        Reference: Hyperliquid SDK example rounding: see rounding.py
        """
        if px > 100_000:
            return round(px)
        sz_decimals = self._get_sz_decimals(coin)
        # If unknown, still limit to 5 significant figures as a safe default.
        if sz_decimals is None:
            return float(f"{px:.5g}")
        max_decimals = 6  # perps
        return round(float(f"{px:.5g}"), max_decimals - sz_decimals)

    def plan_rebalance(self, predictions: pl.DataFrame | None = None) -> dict:
        """Compute a rebalancing plan without executing orders."""
        # Check for open orders (warning if present)
        open_orders = self.get_open_orders()
        if open_orders:
            self.callbacks.warn(
                f"Found {len(open_orders)} open order(s). These may conflict with rebalancing."
            )
        # Load predictions if not provided
        if predictions is None:
            self.callbacks.info("Loading predictions...")
            predictions = self._load_predictions()

            if predictions is None or predictions.is_empty():
                self.callbacks.error("No predictions available, cannot rebalance")
                return {
                    "target_positions": {},
                    "trades": [],
                    "skipped_trades": [],
                    "account_value": 0.0,
                    "leverage": self.config.portfolio.target_leverage,
                    "open_orders": open_orders,
                }

            # Display prediction info
            unique_assets = predictions[self.config.data.asset_id_column].n_unique()
            latest_data = predictions[self.config.data.date_column].max()
            self.callbacks.info(
                f"Loaded predictions for {unique_assets} assets (latest: {latest_data})"
            )

        # Asset Selection, Position Calculation, and Trade Generation
        target_positions = self._get_target_positions(predictions)
        current_positions = self.get_positions()
        trades, skipped_trades = self._calculate_trades(
            target_positions, current_positions
        )

        # Build plan (including skipped trades)
        account_value = self.get_account_value()
        leverage = self.config.portfolio.target_leverage
        return {
            "target_positions": target_positions,
            "trades": trades,
            "skipped_trades": skipped_trades,
            "account_value": account_value,
            "leverage": leverage,
            "open_orders": open_orders,
        }

    def execute_plan(self, plan: dict) -> dict:
        """Execute a precomputed plan, returning structured results."""
        trades: list[dict] = plan.get("trades", [])
        if not trades:
            # Nothing to do
            return {"successful_trades": [], "all_trades": trades}

        # Prioritize leverage reduction: execute closes/reductions (and flips) before opens
        trades = self._sort_trades_for_leverage_reduction(trades)

        self.callbacks.info(f"Starting execution of {len(trades)} trades...")
        successful_trades = self._execute_trades(trades)

        # Apply stop losses after execution
        sl_result = None
        if self.config.portfolio.stop_loss.sides != "none":
            self.callbacks.info("Applying stop losses to positions...")
            sl_result = self.apply_stop_losses()

            # Report SL results
            if sl_result.get("status") == "ok":
                applied_count = sl_result.get("total_applied", 0)
                if applied_count > 0:
                    self.callbacks.info(f"✓ Placed {applied_count} stop loss order(s)")

                # Warn about resting orders
                resting = [t for t in successful_trades if t.get("resting")]
                if resting:
                    self.callbacks.warn(
                        f"{len(resting)} order(s) resting on book. "
                        "Run 'cc-liquid apply-stops' after they fill to add protection."
                    )

        return {
            "successful_trades": successful_trades,
            "all_trades": trades,
            "stop_loss_result": sl_result
        }

    def plan_close_all_positions(self, *, force: bool = False) -> dict:
        """Plan to close all open positions (return to cash) without executing orders."""
        current_positions = self.get_positions()

        if not current_positions:
            self.callbacks.info("No open positions to close.")
            return {
                "target_positions": {},
                "trades": [],
                "skipped_trades": [],
                "account_value": self.get_account_value(),
                "leverage": self.config.portfolio.target_leverage,
            }

        self.callbacks.info("Closing all positions to return to cash...")

        # Create target positions of 0 for all current positions
        target_positions = {coin: 0 for coin in current_positions.keys()}
        trades, skipped_trades = self._calculate_trades(
            target_positions, current_positions, force=force
        )

        account_value = self.get_account_value()
        leverage = self.config.portfolio.target_leverage
        return {
            "target_positions": target_positions,
            "trades": trades,
            "skipped_trades": skipped_trades,
            "account_value": account_value,
            "leverage": leverage,
        }

    def _get_target_positions(self, predictions: pl.DataFrame) -> dict[str, float]:
        """Calculate target notionals using configurable weighting scheme."""

        latest_predictions = self._get_latest_predictions(predictions)
        tradeable_predictions = self._filter_tradeable_predictions(latest_predictions)

        if tradeable_predictions.height == 0:
            return {}

        id_col = self.config.data.asset_id_column
        pred_col = self.config.data.prediction_column

        sorted_preds = tradeable_predictions.sort(pred_col, descending=True)

        num_long = self.config.portfolio.num_long
        num_short = self.config.portfolio.num_short

        if sorted_preds.height < num_long + num_short:
            self.callbacks.warn(
                f"Limited tradeable assets: {sorted_preds.height} available; "
                f"requested {num_long} longs and {num_short} shorts"
            )

        long_assets = sorted_preds.head(num_long)[id_col].to_list()
        short_assets = (
            sorted_preds.sort(pred_col, descending=False)
            .head(num_short)[id_col]
            .to_list()
            if num_short > 0
            else []
        )

        account_value = self.get_account_value()
        target_leverage = self.config.portfolio.target_leverage
        total_positions = len(long_assets) + len(short_assets)

        if total_positions == 0 or account_value <= 0 or target_leverage <= 0:
            return {}

        self.callbacks.info(
            f"Target gross leverage: {target_leverage:.2f}x across {total_positions} positions"
        )

        weights = weights_from_ranks(
            latest_preds=tradeable_predictions.select([id_col, pred_col]),
            id_col=id_col,
            pred_col=pred_col,
            long_assets=long_assets,
            short_assets=short_assets,
            target_gross=target_leverage,
            power=self.config.portfolio.rank_power,
        )

        target_positions = {
            asset: weight * account_value for asset, weight in weights.items()
        }

        # Warn if resulting notionals fall below exchange minimums
        min_notional = self.config.execution.min_trade_value
        undersized = [
            asset
            for asset, weight in target_positions.items()
            if abs(weight) < min_notional
        ]
        if undersized:
            self.callbacks.warn(
                "Some target positions fall below minimum notional: "
                + ", ".join(sorted(undersized))
            )

        return target_positions

    def _get_latest_predictions(self, predictions: pl.DataFrame) -> pl.DataFrame:
        """Filters for the latest predictions for each asset by date."""
        return (
            predictions.sort(self.config.data.date_column, descending=True)
            .group_by(self.config.data.asset_id_column)
            .first()
        )

    def _filter_tradeable_predictions(self, predictions: pl.DataFrame) -> pl.DataFrame:
        """Filter predictions to Hyperliquid-listed assets."""

        universe = self.info.meta()["universe"]
        available_assets = {
            p["name"] for p in universe if not p.get("isDelisted", False)
        }

        tradeable = predictions.filter(
            pl.col(self.config.data.asset_id_column).is_in(available_assets)
        )

        if tradeable.height == 0:
            self.logger.warning("No predictions match Hyperliquid tradeable assets!")
            self.callbacks.error(
                "Error: No predictions match Hyperliquid tradeable assets!"
            )
            self.callbacks.info(
                f"Available on Hyperliquid: {sorted(list(available_assets)[:10])}{'...' if len(available_assets) > 10 else ''}"
            )
            prediction_assets = (
                predictions[self.config.data.asset_id_column].unique().to_list()
            )
            self.callbacks.info(
                f"In predictions: {sorted(prediction_assets[:10])}{'...' if len(prediction_assets) > 10 else ''}"
            )

        return tradeable

    def _calculate_trades(
        self,
        target_positions: dict[str, float],
        current_positions: dict[str, Any],
        *,
        force: bool = False,
    ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
        """Calculates the trades required to reach the target portfolio using market orders.

        Returns:
            (executable_trades, skipped_trades) - trades that can be executed and those below minimum
        """
        trades = []
        skipped_trades = []  # Track trades we can't execute
        all_mids = self.info.all_mids()


        fee_info = self.get_fee_summary()
        taker_rate = float(fee_info.get("userCrossRate", 0.00035))

        all_assets = set(target_positions.keys()) | set(current_positions.keys())

        for asset in all_assets:
            target_value = target_positions.get(asset, 0)

            # Get current position details
            current_position = current_positions.get(asset, {})
            current_size = float(current_position.get("szi", 0))

            # Ensure we have a mid price; otherwise skip
            if asset not in all_mids:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": "No mid price available",
                    }
                )
                continue

            price = float(all_mids[asset])

            # Calculate current value with proper sign
            # szi is positive for long, negative for short
            current_value = current_size * price

            # Calculate the value delta we need to achieve
            delta_value = target_value - current_value

            # Determine trade direction
            # If delta_value > 0, we need to buy (increase position or reduce short)
            # If delta_value < 0, we need to sell (decrease position or increase short)
            is_buy = delta_value > 0
            size = abs(delta_value) / price

            # Round the size using szDecimals from meta (perps only)
            coin = asset
            rounded = self._round_size(coin, size)
            if rounded is None:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": "Unknown szDecimals (meta)",
                    }
                )
                continue
            size, sz_decimals = rounded

            # If rounding collapses to zero, skip
            if size == 0:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": f"Rounded size is 0 at {sz_decimals} dp",
                    }
                )
                continue

            # Check if trade is below minimum value threshold
            min_trade_value = self.config.execution.min_trade_value
            # Classify the trade type for clearer downstream handling
            # Types: open, close, reduce, increase, flip
            trade_type: str
            if current_value == 0:
                trade_type = "open" if target_value != 0 else "increase"
            elif target_value == 0:
                trade_type = "close"
            else:
                same_sign = (current_value > 0 and target_value > 0) or (
                    current_value < 0 and target_value < 0
                )
                if same_sign:
                    trade_type = (
                        "reduce"
                        if abs(target_value) < abs(current_value)
                        else "increase"
                    )
                else:
                    trade_type = "flip"

            trade_data = {
                "coin": asset,
                "is_buy": is_buy,
                "sz": size,
                "price": price,
                "current_value": current_value,
                "target_value": target_value,
                "delta_value": delta_value,
                "type": trade_type,
                "estimated_fee": abs(delta_value) * taker_rate,
            }

            # Re-evaluate min notional AFTER rounding size
            if abs(size * price) < min_trade_value:
                # Below minimum. If not forcing or not a pure close-to-zero scenario, skip.
                if not force or target_value != 0:
                    trade_data["skipped"] = True
                    trade_data["skip_reason"] = f"Below minimum ${min_trade_value}"
                    skipped_trades.append(trade_data)
                else:
                    forced, reason = self._compose_force_close_trades(
                        asset, price, current_value, min_trade_value, taker_rate
                    )
                    if forced is None:
                        skipped_trades.append(
                            {
                                "coin": asset,
                                "target_value": target_value,
                                "skipped": True,
                                "skip_reason": reason
                                or "Force close composition failed",
                            }
                        )
                    else:
                        trades.extend(forced)
            else:
                # Add to executable trades
                trades.append(trade_data)

        return trades, skipped_trades

    def _sort_trades_for_leverage_reduction(
        self, trades: list[dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Return trades ordered to reduce leverage first using explicit trade types.

        Priority: close (0), reduce/flip (1), increase (2), open (3). Stable ordering within groups.
        """
        priority = {"close": 0, "reduce": 1, "flip": 1, "increase": 2, "open": 3}

        def sort_key(t: dict[str, Any]):
            # Forced close chains must execute in sequence: increase (0) then close (1)
            if t.get("force"):
                return (0, t.get("force_id", ""), t.get("force_seq", 0))
            return (1, priority.get(t.get("type", "increase"), 2), 0)

        return sorted(trades, key=sort_key)

    def _compose_force_close_trades(
        self, coin: str, price: float, current_value: float, min_trade_value: float, taker_rate: float
    ) -> tuple[list[dict[str, Any]] | None, str | None]:
        """Compose the two-step forced close for sub-minimum closes.
        i.e. if we have a position of less than $10, we want to close it to $0, we need to increase the position
        to at least $10, then close it to $0.

        Returns (trades, None) on success or (None, reason) on failure.
        """
        rounded_up = self._round_size_up_to_min_notional(coin, min_trade_value, price)
        if rounded_up is None:
            return None, "Unknown szDecimals (meta)"
        min_increase_sz, _ = rounded_up

        increase_is_buy = current_value > 0
        force_id = f"force_close:{coin}"

        step1_delta = min_increase_sz * price if increase_is_buy else -(min_increase_sz * price)

        step1 = {
            "coin": coin,
            "is_buy": increase_is_buy,
            "sz": min_increase_sz,
            "price": price,
            "current_value": current_value,
            "target_value": current_value
            + (
                min_increase_sz * price
                if current_value >= 0
                else -min_increase_sz * price
            ),
            "delta_value": step1_delta,
            "type": "increase",
            "force": True,
            "force_id": force_id,
            "force_seq": 0,
            "estimated_fee": abs(step1_delta) * taker_rate,
        }

        total_notional_to_close = abs(current_value) + (min_increase_sz * price)
        close_is_buy = not increase_is_buy
        close_sz_rounded = self._round_size(coin, total_notional_to_close / price)
        if close_sz_rounded is None:
            return None, "Unknown szDecimals (meta)"
        close_sz, _ = close_sz_rounded

        step2_delta = total_notional_to_close if close_is_buy else -total_notional_to_close

        step2 = {
            "coin": coin,
            "is_buy": close_is_buy,
            "sz": close_sz,
            "price": price,
            "current_value": current_value
            + (
                min_increase_sz * price
                if increase_is_buy
                else -(min_increase_sz * price)
            ),
            "target_value": 0,
            "delta_value": step2_delta,
            "type": "close",
            "force": True,
            "force_id": force_id,
            "force_seq": 1,
            "estimated_fee": abs(step2_delta) * taker_rate,
        }

        # Ensure both meet minimum notional
        if (step1["sz"] * price) < min_trade_value or (
            step2["sz"] * price
        ) < min_trade_value:
            return (
                None,
                f"Below minimum even after force composition (${min_trade_value})",
            )

        return [step1, step2], None

    def _round_size_up_to_min_notional(
        self, coin: str, target_notional: float, price: float
    ) -> tuple[float, int] | None:
        """Return (size, decimals) such that size*price >= target_notional after rounding to szDecimals.

        Rounds up to the nearest step defined by szDecimals to satisfy the notional constraint.
        """
        sz_decimals = self._get_sz_decimals(coin)
        if sz_decimals is None:
            return None
        raw_size = target_notional / price if price > 0 else 0
        if raw_size <= 0:
            return (0.0, sz_decimals)
        step = 10 ** (-sz_decimals)
        # Avoid floating imprecision by working in integer steps
        steps_needed = math.ceil(raw_size / step)
        rounded_up_size = steps_needed * step
        # Round to the allowed decimals to avoid long floats
        rounded_up_size = round(rounded_up_size, sz_decimals)
        return rounded_up_size, sz_decimals

    def _execute_trades(self, trades: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Executes a list of trades sequentially with configurable order type and time-in-force."""
        if not trades:
            return []

        successful_trades = []
        failed_trades = []

        # Show progress during execution
        self.callbacks.info(f"Executing {len(trades)} trades...")

        for i, trade in enumerate(trades, 1):
            # Notify callback of trade start
            self.callbacks.on_trade_start(i, len(trades), trade)

            try:
                self.logger.debug(f"Executing trade: {trade}")

                coin = trade["coin"]
                is_buy = trade["is_buy"]
                size = trade["sz"]

                # Determine limit price based on order_type
                if self.config.execution.order_type == "limit":
                    # Use passive pricing: buy below mid, sell above mid
                    mid_price = float(self.info.all_mids()[coin])
                    offset = self.config.execution.limit_price_offset

                    if is_buy:
                        limit_px = self._round_price_perp(coin, mid_price * (1 - offset))
                    else:
                        limit_px = self._round_price_perp(coin, mid_price * (1 + offset))
                else:  # market
                    # Use slippage price (aggressive) for market orders
                    limit_px = self.exchange._slippage_price(
                        coin, is_buy, self.config.execution.slippage_tolerance
                    )

                # Build single order request
                order_request = {
                    "coin": coin,
                    "is_buy": is_buy,
                    "sz": size,
                    "limit_px": limit_px,
                    "order_type": {"limit": {"tif": self.config.execution.time_in_force}},
                    "reduce_only": False,
                }

                # Execute single order via bulk_orders with single-item list
                result = self.exchange.bulk_orders([order_request])

                # Handle error responses
                if result.get("status") == "err":
                    error_msg = result.get("response", "Unknown error")
                    self.callbacks.on_trade_fail(trade, error_msg)
                    failed_trades.append(trade)
                    continue

                # Handle success responses
                response = result.get("response", {})
                if not isinstance(response, dict):
                    self.callbacks.on_trade_fail(trade, f"Unexpected response format: {response}")
                    failed_trades.append(trade)
                    continue

                statuses = response.get("data", {}).get("statuses", [])

                if not statuses:
                    self.callbacks.on_trade_fail(trade, "No status returned")
                    failed_trades.append(trade)
                    continue

                status = statuses[0]  # Get first (and only) status

                # Check for filled orders
                if "filled" in status:
                    filled_data = status["filled"]
                    avg_px = float(filled_data.get("avgPx", trade["price"]))

                    # Calculate slippage
                    if trade["is_buy"]:
                        slippage_pct = (
                            (avg_px - trade["price"]) / trade["price"]
                        ) * 100
                    else:
                        slippage_pct = (
                            (trade["price"] - avg_px) / trade["price"]
                        ) * 100

                    # Extract actual fee from API response
                    actual_fee = float(filled_data.get("fee", 0))

                    self.callbacks.on_trade_fill(trade, filled_data, slippage_pct)

                    successful_trades.append(
                        {
                            **trade,
                            "fill_data": filled_data,
                            "slippage_pct": slippage_pct,
                            "actual_fee": actual_fee,
                            "status": "filled",
                        }
                    )

                # Check for resting orders (Gtc/Alo orders posted to book)
                elif "resting" in status and self.config.execution.time_in_force in ("Gtc", "Alo"):
                    resting_data = status["resting"]
                    oid = resting_data.get("oid")

                    # This is a success for Gtc/Alo - order is on the book
                    self.callbacks.info(
                        f"  {trade['coin']}: Order posted to book (OID: {oid}). "
                        f"Check with 'cc-liquid orders'"
                    )

                    successful_trades.append(
                        {
                            **trade,
                            "resting": True,
                            "oid": oid,
                            "status": "resting",
                        }
                    )

                # Handle errors or actual failures
                else:
                    if "error" in status:
                        error_msg = status["error"]
                    else:
                        error_msg = "Order rejected or not filled"

                    self.callbacks.on_trade_fail(trade, error_msg)
                    failed_trades.append(trade)

            except Exception as e:
                self.callbacks.on_trade_fail(trade, str(e))
                self.logger.error(f"Error executing trade for {trade['coin']}: {e}")
                failed_trades.append(trade)

        # Notify callback of batch completion
        self.callbacks.on_batch_complete(successful_trades, failed_trades)

        return successful_trades

    def load_state(self) -> datetime | None:
        """Public wrapper to load last rebalance timestamp."""
        return self._load_state()

    def save_state(self, last_rebalance_date: datetime) -> None:
        """Public wrapper to persist last rebalance timestamp."""
        self._save_state(last_rebalance_date)

    def compute_next_rebalance_time(
        self, last_rebalance_date: datetime | None, now: datetime | None = None
    ) -> datetime:
        """Compute the next scheduled rebalance timestamp in UTC.

        Rules:
        - If this is the first run (no last date): schedule for today at configured time; if
          already past that time, return "now" to indicate it is due immediately.
        - Otherwise: schedule exactly every_n_days after the last rebalance date, at the
          configured time.
        """
        cfg = self.config.portfolio.rebalancing
        now_utc = now or datetime.now(timezone.utc)

        hour, minute = map(int, cfg.at_time.split(":"))
        rebalance_time = time(hour=hour, minute=minute)

        if last_rebalance_date is None:
            today_at = datetime.combine(
                now_utc.date(), rebalance_time, tzinfo=timezone.utc
            )
            return today_at if now_utc < today_at else now_utc

        next_date = last_rebalance_date.date() + timedelta(days=cfg.every_n_days)
        return datetime.combine(next_date, rebalance_time, tzinfo=timezone.utc)

    def _load_state(self) -> datetime | None:
        """Load the last rebalance date from persistent state."""
        import json
        import os

        state_file = ".cc_liquid_state.json"
        if not os.path.exists(state_file):
            return None

        try:
            with open(state_file) as f:
                state = json.load(f)
                last_date = datetime.fromisoformat(state.get("last_rebalance_date"))
                return last_date
        except Exception as e:
            self.logger.warning(f"Could not load state file: {e}")
            return None

    def _save_state(self, last_rebalance_date: datetime):
        """Save the last rebalance date to persistent state."""
        import json

        state_file = ".cc_liquid_state.json"
        with open(state_file, "w") as f:
            json.dump({"last_rebalance_date": last_rebalance_date.isoformat()}, f)

    def get_open_orders(self) -> list[dict[str, Any]]:
        """Get current open orders.

        Returns:
            List of open orders with details like coin, size, limit price, side, etc.
        """
        owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
        if not owner:
            raise ValueError("Missing portfolio owner address")
        return self.info.open_orders(owner)

    def cancel_open_orders(self, coin: str | None = None) -> dict[str, Any]:
        """Cancel open orders, optionally filtered by coin.

        Args:
            coin: If provided, only cancel orders for this coin. If None, cancel all.

        Returns:
            Result of the cancel operation
        """
        open_orders = self.get_open_orders()

        if not open_orders:
            return {"status": "ok", "response": "No open orders to cancel"}

        # Filter by coin if specified
        if coin:
            orders_to_cancel = [o for o in open_orders if o["coin"] == coin]
        else:
            orders_to_cancel = open_orders

        if not orders_to_cancel:
            return {
                "status": "ok",
                "response": f"No open orders found for {coin}" if coin else "No orders to cancel",
            }

        # Build cancel requests
        cancel_requests = [
            {"coin": order["coin"], "oid": order["oid"]} for order in orders_to_cancel
        ]

        # Execute bulk cancel
        self.logger.info(f"Cancelling {len(cancel_requests)} open orders...")
        result = self.exchange.bulk_cancel(cancel_requests)

        return result

    def get_fill_history(
        self, start_time: int | None = None, end_time: int | None = None
    ) -> list[dict[str, Any]]:
        """Get fill history with optional time range.

        Args:
            start_time: Unix timestamp in milliseconds (optional)
            end_time: Unix timestamp in milliseconds (optional)

        Returns:
            List of fills with execution details, prices, sizes, and fees
        """
        owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
        if not owner:
            raise ValueError("Missing portfolio owner address")

        if start_time is not None:
            return self.info.user_fills_by_time(owner, start_time, end_time)
        return self.info.user_fills(owner)

    def get_fee_summary(self) -> dict[str, Any]:
        """Get fee rates and trading volume statistics.

        Returns:
            Dictionary containing fee rates (maker/taker), volume stats, and fee schedule
        """
        owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
        if not owner:
            raise ValueError("Missing portfolio owner address")
        return self.info.user_fees(owner)

    def _should_apply_stop_loss(self, side: str) -> bool:
        """Check if stop loss should be applied to a position side."""
        sides_config = self.config.portfolio.stop_loss.sides
        if sides_config == "none":
            return False
        if sides_config == "both":
            return True
        if sides_config == "long_only" and side == "LONG":
            return True
        if sides_config == "short_only" and side == "SHORT":
            return True
        return False

    def cancel_all_tpsl_orders(self) -> dict[str, Any]:
        """Cancel all existing TP/SL orders across the portfolio."""
        open_orders = self.get_open_orders()

        if not open_orders:
            return {"status": "ok", "response": "No open orders", "cancelled": 0}

        # Filter for TP/SL orders - check both nested structure and direct
        tpsl_orders = []
        for o in open_orders:
            order_type = o.get("orderType", {})
            # Check if it's a trigger order (TP/SL)
            if isinstance(order_type, dict) and "trigger" in order_type:
                tpsl_orders.append(o)
            # Also check string format if API returns it differently
            elif isinstance(order_type, str) and "trigger" in order_type.lower():
                tpsl_orders.append(o)

        if not tpsl_orders:
            self.callbacks.info(f"No existing TP/SL orders to cancel (found {len(open_orders)} other orders)")
            return {"status": "ok", "response": "No TP/SL orders to cancel", "cancelled": 0}

        cancel_requests = [
            {"coin": order["coin"], "oid": order["oid"]} 
            for order in tpsl_orders
        ]

        self.callbacks.info(f"Cancelling {len(cancel_requests)} existing TP/SL order(s)...")
        result = self.exchange.bulk_cancel(cancel_requests)
        result["cancelled"] = len(cancel_requests)
        return result

    def apply_stop_losses(self) -> dict[str, Any]:
        """Apply stop losses to all current open positions per config.

        Returns:
            Dict with counts of applied/skipped SLs and any errors
        """
        if self.config.portfolio.stop_loss.sides == "none":
            return {
                "status": "disabled",
                "message": "Stop losses disabled in config (stop_loss.sides=none)"
            }

        # Get current positions
        positions = self.get_positions()
        if not positions:
            return {
                "status": "ok",
                "applied": 0,
                "message": "No open positions to protect"
            }

        # Cancel existing TP/SL orders first
        self.cancel_all_tpsl_orders()

        # Get current prices
        all_mids = self.info.all_mids()

        applied = []
        skipped = []
        errors = []

        # Count eligible positions first for progress tracking
        eligible_positions = []
        for coin, position in positions.items():
            size = float(position.get("szi", 0))
            if size == 0:
                continue
            side = "LONG" if size > 0 else "SHORT"
            if self._should_apply_stop_loss(side):
                eligible_positions.append(coin)

        total_eligible = len(eligible_positions)

        # Build all SL orders first
        orders_to_place = []
        order_metadata = []  # Track metadata for each order

        for coin, position in positions.items():
            size = float(position.get("szi", 0))
            if size == 0:
                continue

            side = "LONG" if size > 0 else "SHORT"

            # Check if this side should have SL
            if not self._should_apply_stop_loss(side):
                skipped.append({"coin": coin, "reason": f"Side {side} not configured"})
                continue

            # Get entry price
            entry_px = float(position.get("entryPx", 0))
            if entry_px <= 0:
                skipped.append({"coin": coin, "reason": "Invalid entry price"})
                continue

            # Calculate trigger price
            stop_pct = self.config.portfolio.stop_loss.pct
            if side == "LONG":
                trigger_px = entry_px * (1 - stop_pct)
                is_buy = False  # SL on long = sell
            else:  # SHORT
                trigger_px = entry_px * (1 + stop_pct)
                is_buy = True  # SL on short = buy

            # Calculate limit price with slippage
            slippage = self.config.portfolio.stop_loss.slippage
            if is_buy:
                limit_px = trigger_px * (1 + slippage)
            else:
                limit_px = trigger_px * (1 - slippage)

            # Round prices
            trigger_px = self._round_price_perp(coin, trigger_px)
            limit_px = self._round_price_perp(coin, limit_px)

            # Round size
            rounded = self._round_size(coin, abs(size))
            if rounded is None:
                skipped.append({"coin": coin, "reason": "Unknown szDecimals"})
                continue
            sl_size, _ = rounded

            # Build SL order
            sl_order = {
                "coin": coin,
                "is_buy": is_buy,
                "sz": sl_size,
                "limit_px": limit_px,
                "order_type": {
                    "trigger": {
                        "isMarket": False,  # Use limit for custom slippage
                        "triggerPx": trigger_px,  # SDK handles string conversion
                        "tpsl": "sl"
                    }
                },
                "reduce_only": True,
            }

            orders_to_place.append(sl_order)
            order_metadata.append({
                "coin": coin,
                "side": side,
                "entry_px": entry_px,
                "trigger_px": trigger_px,
                "limit_px": limit_px,
                "size": sl_size
            })

        # Place all orders in one batch
        if orders_to_place:
            self.callbacks.info(f"Placing {len(orders_to_place)} stop loss order(s) in batch...")

            try:
                result = self.exchange.bulk_orders(orders_to_place)

                if result.get("status") == "ok":
                    # Process response statuses
                    response = result.get("response", {})
                    statuses = response.get("data", {}).get("statuses", [])

                    for i, status in enumerate(statuses):
                        if i >= len(order_metadata):
                            break

                        metadata = order_metadata[i]

                        if "resting" in status:
                            # SL order successfully placed
                            applied.append(metadata)
                        elif "error" in status:
                            errors.append({
                                "coin": metadata["coin"],
                                "error": status["error"]
                            })
                        else:
                            # Unexpected status
                            errors.append({
                                "coin": metadata["coin"],
                                "error": f"Unexpected status: {status}"
                            })
                else:
                    # Bulk operation failed entirely
                    for metadata in order_metadata:
                        errors.append({
                            "coin": metadata["coin"],
                            "error": result.get("response", "Bulk operation failed")
                        })
            except Exception as e:
                # Exception during bulk operation
                for metadata in order_metadata:
                    errors.append({
                        "coin": metadata["coin"],
                        "error": str(e)
                    })

        return {
            "status": "ok",
            "applied": applied,
            "skipped": skipped,
            "errors": errors,
            "total_applied": len(applied),
            "total_skipped": len(skipped),
            "total_errors": len(errors)
        }

    def _load_predictions(self) -> pl.DataFrame | None:
        """Load predictions based on configured data source."""
        try:
            if self.config.data.source == "local":
                # Use local file
                predictions = DataLoader.from_file(
                    self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            elif self.config.data.source == "crowdcent":
                # Download and use CrowdCent meta model
                predictions = DataLoader.from_crowdcent_api(
                    api_key=self.config.CROWDCENT_API_KEY,
                    download_path=self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            elif self.config.data.source == "numerai":
                # Download and use Numerai meta model
                predictions = DataLoader.from_numerai_api(
                    download_path=self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            else:
                raise ValueError(f"Unknown data source: {self.config.data.source}")

            return predictions

        except Exception as e:
            self.logger.error(f"Error loading predictions: {e}")
            return None

apply_stop_losses()

Apply stop losses to all current open positions per config.

Returns:

Type Description
dict[str, Any]

Dict with counts of applied/skipped SLs and any errors

Source code in src/cc_liquid/trader.py
def apply_stop_losses(self) -> dict[str, Any]:
    """Apply stop losses to all current open positions per config.

    Returns:
        Dict with counts of applied/skipped SLs and any errors
    """
    if self.config.portfolio.stop_loss.sides == "none":
        return {
            "status": "disabled",
            "message": "Stop losses disabled in config (stop_loss.sides=none)"
        }

    # Get current positions
    positions = self.get_positions()
    if not positions:
        return {
            "status": "ok",
            "applied": 0,
            "message": "No open positions to protect"
        }

    # Cancel existing TP/SL orders first
    self.cancel_all_tpsl_orders()

    # Get current prices
    all_mids = self.info.all_mids()

    applied = []
    skipped = []
    errors = []

    # Count eligible positions first for progress tracking
    eligible_positions = []
    for coin, position in positions.items():
        size = float(position.get("szi", 0))
        if size == 0:
            continue
        side = "LONG" if size > 0 else "SHORT"
        if self._should_apply_stop_loss(side):
            eligible_positions.append(coin)

    total_eligible = len(eligible_positions)

    # Build all SL orders first
    orders_to_place = []
    order_metadata = []  # Track metadata for each order

    for coin, position in positions.items():
        size = float(position.get("szi", 0))
        if size == 0:
            continue

        side = "LONG" if size > 0 else "SHORT"

        # Check if this side should have SL
        if not self._should_apply_stop_loss(side):
            skipped.append({"coin": coin, "reason": f"Side {side} not configured"})
            continue

        # Get entry price
        entry_px = float(position.get("entryPx", 0))
        if entry_px <= 0:
            skipped.append({"coin": coin, "reason": "Invalid entry price"})
            continue

        # Calculate trigger price
        stop_pct = self.config.portfolio.stop_loss.pct
        if side == "LONG":
            trigger_px = entry_px * (1 - stop_pct)
            is_buy = False  # SL on long = sell
        else:  # SHORT
            trigger_px = entry_px * (1 + stop_pct)
            is_buy = True  # SL on short = buy

        # Calculate limit price with slippage
        slippage = self.config.portfolio.stop_loss.slippage
        if is_buy:
            limit_px = trigger_px * (1 + slippage)
        else:
            limit_px = trigger_px * (1 - slippage)

        # Round prices
        trigger_px = self._round_price_perp(coin, trigger_px)
        limit_px = self._round_price_perp(coin, limit_px)

        # Round size
        rounded = self._round_size(coin, abs(size))
        if rounded is None:
            skipped.append({"coin": coin, "reason": "Unknown szDecimals"})
            continue
        sl_size, _ = rounded

        # Build SL order
        sl_order = {
            "coin": coin,
            "is_buy": is_buy,
            "sz": sl_size,
            "limit_px": limit_px,
            "order_type": {
                "trigger": {
                    "isMarket": False,  # Use limit for custom slippage
                    "triggerPx": trigger_px,  # SDK handles string conversion
                    "tpsl": "sl"
                }
            },
            "reduce_only": True,
        }

        orders_to_place.append(sl_order)
        order_metadata.append({
            "coin": coin,
            "side": side,
            "entry_px": entry_px,
            "trigger_px": trigger_px,
            "limit_px": limit_px,
            "size": sl_size
        })

    # Place all orders in one batch
    if orders_to_place:
        self.callbacks.info(f"Placing {len(orders_to_place)} stop loss order(s) in batch...")

        try:
            result = self.exchange.bulk_orders(orders_to_place)

            if result.get("status") == "ok":
                # Process response statuses
                response = result.get("response", {})
                statuses = response.get("data", {}).get("statuses", [])

                for i, status in enumerate(statuses):
                    if i >= len(order_metadata):
                        break

                    metadata = order_metadata[i]

                    if "resting" in status:
                        # SL order successfully placed
                        applied.append(metadata)
                    elif "error" in status:
                        errors.append({
                            "coin": metadata["coin"],
                            "error": status["error"]
                        })
                    else:
                        # Unexpected status
                        errors.append({
                            "coin": metadata["coin"],
                            "error": f"Unexpected status: {status}"
                        })
            else:
                # Bulk operation failed entirely
                for metadata in order_metadata:
                    errors.append({
                        "coin": metadata["coin"],
                        "error": result.get("response", "Bulk operation failed")
                    })
        except Exception as e:
            # Exception during bulk operation
            for metadata in order_metadata:
                errors.append({
                    "coin": metadata["coin"],
                    "error": str(e)
                })

    return {
        "status": "ok",
        "applied": applied,
        "skipped": skipped,
        "errors": errors,
        "total_applied": len(applied),
        "total_skipped": len(skipped),
        "total_errors": len(errors)
    }

cancel_all_tpsl_orders()

Cancel all existing TP/SL orders across the portfolio.

Source code in src/cc_liquid/trader.py
def cancel_all_tpsl_orders(self) -> dict[str, Any]:
    """Cancel all existing TP/SL orders across the portfolio."""
    open_orders = self.get_open_orders()

    if not open_orders:
        return {"status": "ok", "response": "No open orders", "cancelled": 0}

    # Filter for TP/SL orders - check both nested structure and direct
    tpsl_orders = []
    for o in open_orders:
        order_type = o.get("orderType", {})
        # Check if it's a trigger order (TP/SL)
        if isinstance(order_type, dict) and "trigger" in order_type:
            tpsl_orders.append(o)
        # Also check string format if API returns it differently
        elif isinstance(order_type, str) and "trigger" in order_type.lower():
            tpsl_orders.append(o)

    if not tpsl_orders:
        self.callbacks.info(f"No existing TP/SL orders to cancel (found {len(open_orders)} other orders)")
        return {"status": "ok", "response": "No TP/SL orders to cancel", "cancelled": 0}

    cancel_requests = [
        {"coin": order["coin"], "oid": order["oid"]} 
        for order in tpsl_orders
    ]

    self.callbacks.info(f"Cancelling {len(cancel_requests)} existing TP/SL order(s)...")
    result = self.exchange.bulk_cancel(cancel_requests)
    result["cancelled"] = len(cancel_requests)
    return result

cancel_open_orders(coin=None)

Cancel open orders, optionally filtered by coin.

Parameters:

Name Type Description Default
coin str | None

If provided, only cancel orders for this coin. If None, cancel all.

None

Returns:

Type Description
dict[str, Any]

Result of the cancel operation

Source code in src/cc_liquid/trader.py
def cancel_open_orders(self, coin: str | None = None) -> dict[str, Any]:
    """Cancel open orders, optionally filtered by coin.

    Args:
        coin: If provided, only cancel orders for this coin. If None, cancel all.

    Returns:
        Result of the cancel operation
    """
    open_orders = self.get_open_orders()

    if not open_orders:
        return {"status": "ok", "response": "No open orders to cancel"}

    # Filter by coin if specified
    if coin:
        orders_to_cancel = [o for o in open_orders if o["coin"] == coin]
    else:
        orders_to_cancel = open_orders

    if not orders_to_cancel:
        return {
            "status": "ok",
            "response": f"No open orders found for {coin}" if coin else "No orders to cancel",
        }

    # Build cancel requests
    cancel_requests = [
        {"coin": order["coin"], "oid": order["oid"]} for order in orders_to_cancel
    ]

    # Execute bulk cancel
    self.logger.info(f"Cancelling {len(cancel_requests)} open orders...")
    result = self.exchange.bulk_cancel(cancel_requests)

    return result

compute_next_rebalance_time(last_rebalance_date, now=None)

Compute the next scheduled rebalance timestamp in UTC.

Rules: - If this is the first run (no last date): schedule for today at configured time; if already past that time, return "now" to indicate it is due immediately. - Otherwise: schedule exactly every_n_days after the last rebalance date, at the configured time.

Source code in src/cc_liquid/trader.py
def compute_next_rebalance_time(
    self, last_rebalance_date: datetime | None, now: datetime | None = None
) -> datetime:
    """Compute the next scheduled rebalance timestamp in UTC.

    Rules:
    - If this is the first run (no last date): schedule for today at configured time; if
      already past that time, return "now" to indicate it is due immediately.
    - Otherwise: schedule exactly every_n_days after the last rebalance date, at the
      configured time.
    """
    cfg = self.config.portfolio.rebalancing
    now_utc = now or datetime.now(timezone.utc)

    hour, minute = map(int, cfg.at_time.split(":"))
    rebalance_time = time(hour=hour, minute=minute)

    if last_rebalance_date is None:
        today_at = datetime.combine(
            now_utc.date(), rebalance_time, tzinfo=timezone.utc
        )
        return today_at if now_utc < today_at else now_utc

    next_date = last_rebalance_date.date() + timedelta(days=cfg.every_n_days)
    return datetime.combine(next_date, rebalance_time, tzinfo=timezone.utc)

execute_plan(plan)

Execute a precomputed plan, returning structured results.

Source code in src/cc_liquid/trader.py
def execute_plan(self, plan: dict) -> dict:
    """Execute a precomputed plan, returning structured results."""
    trades: list[dict] = plan.get("trades", [])
    if not trades:
        # Nothing to do
        return {"successful_trades": [], "all_trades": trades}

    # Prioritize leverage reduction: execute closes/reductions (and flips) before opens
    trades = self._sort_trades_for_leverage_reduction(trades)

    self.callbacks.info(f"Starting execution of {len(trades)} trades...")
    successful_trades = self._execute_trades(trades)

    # Apply stop losses after execution
    sl_result = None
    if self.config.portfolio.stop_loss.sides != "none":
        self.callbacks.info("Applying stop losses to positions...")
        sl_result = self.apply_stop_losses()

        # Report SL results
        if sl_result.get("status") == "ok":
            applied_count = sl_result.get("total_applied", 0)
            if applied_count > 0:
                self.callbacks.info(f"✓ Placed {applied_count} stop loss order(s)")

            # Warn about resting orders
            resting = [t for t in successful_trades if t.get("resting")]
            if resting:
                self.callbacks.warn(
                    f"{len(resting)} order(s) resting on book. "
                    "Run 'cc-liquid apply-stops' after they fill to add protection."
                )

    return {
        "successful_trades": successful_trades,
        "all_trades": trades,
        "stop_loss_result": sl_result
    }

get_account_value()

Retrieves the total account value in USD.

Source code in src/cc_liquid/trader.py
def get_account_value(self) -> float:
    """Retrieves the total account value in USD."""
    user_state = self.get_user_state()
    return float(user_state["marginSummary"]["accountValue"])

get_fee_summary()

Get fee rates and trading volume statistics.

Returns:

Type Description
dict[str, Any]

Dictionary containing fee rates (maker/taker), volume stats, and fee schedule

Source code in src/cc_liquid/trader.py
def get_fee_summary(self) -> dict[str, Any]:
    """Get fee rates and trading volume statistics.

    Returns:
        Dictionary containing fee rates (maker/taker), volume stats, and fee schedule
    """
    owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
    if not owner:
        raise ValueError("Missing portfolio owner address")
    return self.info.user_fees(owner)

get_fill_history(start_time=None, end_time=None)

Get fill history with optional time range.

Parameters:

Name Type Description Default
start_time int | None

Unix timestamp in milliseconds (optional)

None
end_time int | None

Unix timestamp in milliseconds (optional)

None

Returns:

Type Description
list[dict[str, Any]]

List of fills with execution details, prices, sizes, and fees

Source code in src/cc_liquid/trader.py
def get_fill_history(
    self, start_time: int | None = None, end_time: int | None = None
) -> list[dict[str, Any]]:
    """Get fill history with optional time range.

    Args:
        start_time: Unix timestamp in milliseconds (optional)
        end_time: Unix timestamp in milliseconds (optional)

    Returns:
        List of fills with execution details, prices, sizes, and fees
    """
    owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
    if not owner:
        raise ValueError("Missing portfolio owner address")

    if start_time is not None:
        return self.info.user_fills_by_time(owner, start_time, end_time)
    return self.info.user_fills(owner)

get_open_orders()

Get current open orders.

Returns:

Type Description
list[dict[str, Any]]

List of open orders with details like coin, size, limit price, side, etc.

Source code in src/cc_liquid/trader.py
def get_open_orders(self) -> list[dict[str, Any]]:
    """Get current open orders.

    Returns:
        List of open orders with details like coin, size, limit price, side, etc.
    """
    owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
    if not owner:
        raise ValueError("Missing portfolio owner address")
    return self.info.open_orders(owner)

get_portfolio_info()

Get complete portfolio information as structured data.

Source code in src/cc_liquid/trader.py
def get_portfolio_info(self) -> PortfolioInfo:
    """Get complete portfolio information as structured data."""
    try:
        user_state = self.get_user_state()
    except Exception as e:
        self.logger.warning(f"Could not get user state: {e}")
        # Return empty portfolio if we can't connect
        return PortfolioInfo(
            account=AccountInfo(
                account_value=0,
                total_position_value=0,
                margin_used=0,
                free_collateral=0,
                cash_balance=0,
                withdrawable=0,
                current_leverage=0,
            ),
            positions=[],
        )

    margin_summary = user_state.get("marginSummary", {}) if user_state else {}
    all_mids = self.info.all_mids() if user_state else {}

    # Build account info
    account_info = AccountInfo(
        account_value=float(margin_summary.get("accountValue", 0)),
        total_position_value=float(margin_summary.get("totalNtlPos", 0)),
        margin_used=float(margin_summary.get("totalMarginUsed", 0)),
        free_collateral=float(margin_summary.get("accountValue", 0))
        - float(margin_summary.get("totalMarginUsed", 0)),
        cash_balance=float(margin_summary.get("totalRawUsd", 0)),
        withdrawable=float(user_state.get("withdrawable", 0)),
        current_leverage=float(margin_summary.get("totalNtlPos", 0))
        / float(margin_summary.get("accountValue", 1))
        if float(margin_summary.get("accountValue", 0)) > 0
        else 0,
        raw_margin_summary=margin_summary,
    )

    # Add cross margin info if available
    cross_margin = user_state.get("crossMarginSummary")
    if cross_margin:
        account_info.cross_leverage = (
            float(cross_margin.get("accountValue", 0))
            / float(margin_summary.get("accountValue", 1))
            if float(margin_summary.get("accountValue", 0)) > 0
            else 0
        )
        account_info.cross_margin_used = float(
            cross_margin.get("totalMarginUsed", 0)
        )
        account_info.cross_maintenance_margin = float(
            cross_margin.get("totalMaintenanceMargin", 0)
        )
        account_info.raw_cross_margin_summary = cross_margin

    # Build positions list
    positions = []
    for position_data in user_state.get("assetPositions", []):
        pos = position_data.get("position", {})
        size = float(pos.get("szi", 0))

        if size == 0:
            continue

        coin = pos["coin"]
        entry_px = float(pos.get("entryPx", 0))
        mark_px = float(all_mids.get(coin, entry_px))
        position_value = abs(size * mark_px)

        # Calculate unrealized PnL
        if size > 0:
            unrealized_pnl = (mark_px - entry_px) * size
            side = "LONG"
        else:
            unrealized_pnl = (entry_px - mark_px) * abs(size)
            side = "SHORT"

        return_pct = (
            (unrealized_pnl / (abs(size) * entry_px) * 100) if entry_px > 0 else 0
        )

        positions.append(
            Position(
                coin=coin,
                side=side,
                size=abs(size),
                entry_price=entry_px,
                mark_price=mark_px,
                value=position_value,
                unrealized_pnl=unrealized_pnl,
                return_pct=return_pct,
                liquidation_price=float(pos["liquidationPx"])
                if "liquidationPx" in pos and pos["liquidationPx"] is not None
                else None,
                margin_used=float(pos["marginUsed"])
                if "marginUsed" in pos and pos["marginUsed"] is not None
                else None,
            )
        )

    return PortfolioInfo(account=account_info, positions=positions)

get_positions()

Retrieves the user's open positions as a dict.

Source code in src/cc_liquid/trader.py
def get_positions(self) -> dict[str, Any]:
    """Retrieves the user's open positions as a dict."""
    user_state = self.get_user_state()
    positions = {}
    for position_data in user_state.get("assetPositions", []):
        position = position_data.get("position", {})
        if float(position.get("szi", 0)) != 0:
            positions[position["coin"]] = position
    return positions

get_user_state()

Retrieves the current state of the user's account.

Source code in src/cc_liquid/trader.py
def get_user_state(self) -> dict[str, Any]:
    """Retrieves the current state of the user's account."""
    # Always query Info using the portfolio owner: vault (if set) or master address.
    # Never use the agent/signer address for Info, as it has no balances.
    owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
    if not owner:
        raise ValueError(
            "Missing portfolio owner. Set HYPERLIQUID_VAULT_ADDRESS or HYPERLIQUID_ADDRESS."
        )
    return self.info.user_state(owner)

load_state()

Public wrapper to load last rebalance timestamp.

Source code in src/cc_liquid/trader.py
def load_state(self) -> datetime | None:
    """Public wrapper to load last rebalance timestamp."""
    return self._load_state()

plan_close_all_positions(*, force=False)

Plan to close all open positions (return to cash) without executing orders.

Source code in src/cc_liquid/trader.py
def plan_close_all_positions(self, *, force: bool = False) -> dict:
    """Plan to close all open positions (return to cash) without executing orders."""
    current_positions = self.get_positions()

    if not current_positions:
        self.callbacks.info("No open positions to close.")
        return {
            "target_positions": {},
            "trades": [],
            "skipped_trades": [],
            "account_value": self.get_account_value(),
            "leverage": self.config.portfolio.target_leverage,
        }

    self.callbacks.info("Closing all positions to return to cash...")

    # Create target positions of 0 for all current positions
    target_positions = {coin: 0 for coin in current_positions.keys()}
    trades, skipped_trades = self._calculate_trades(
        target_positions, current_positions, force=force
    )

    account_value = self.get_account_value()
    leverage = self.config.portfolio.target_leverage
    return {
        "target_positions": target_positions,
        "trades": trades,
        "skipped_trades": skipped_trades,
        "account_value": account_value,
        "leverage": leverage,
    }

plan_rebalance(predictions=None)

Compute a rebalancing plan without executing orders.

Source code in src/cc_liquid/trader.py
def plan_rebalance(self, predictions: pl.DataFrame | None = None) -> dict:
    """Compute a rebalancing plan without executing orders."""
    # Check for open orders (warning if present)
    open_orders = self.get_open_orders()
    if open_orders:
        self.callbacks.warn(
            f"Found {len(open_orders)} open order(s). These may conflict with rebalancing."
        )
    # Load predictions if not provided
    if predictions is None:
        self.callbacks.info("Loading predictions...")
        predictions = self._load_predictions()

        if predictions is None or predictions.is_empty():
            self.callbacks.error("No predictions available, cannot rebalance")
            return {
                "target_positions": {},
                "trades": [],
                "skipped_trades": [],
                "account_value": 0.0,
                "leverage": self.config.portfolio.target_leverage,
                "open_orders": open_orders,
            }

        # Display prediction info
        unique_assets = predictions[self.config.data.asset_id_column].n_unique()
        latest_data = predictions[self.config.data.date_column].max()
        self.callbacks.info(
            f"Loaded predictions for {unique_assets} assets (latest: {latest_data})"
        )

    # Asset Selection, Position Calculation, and Trade Generation
    target_positions = self._get_target_positions(predictions)
    current_positions = self.get_positions()
    trades, skipped_trades = self._calculate_trades(
        target_positions, current_positions
    )

    # Build plan (including skipped trades)
    account_value = self.get_account_value()
    leverage = self.config.portfolio.target_leverage
    return {
        "target_positions": target_positions,
        "trades": trades,
        "skipped_trades": skipped_trades,
        "account_value": account_value,
        "leverage": leverage,
        "open_orders": open_orders,
    }

save_state(last_rebalance_date)

Public wrapper to persist last rebalance timestamp.

Source code in src/cc_liquid/trader.py
def save_state(self, last_rebalance_date: datetime) -> None:
    """Public wrapper to persist last rebalance timestamp."""
    self._save_state(last_rebalance_date)

Callback protocols

CCLiquidCallbacks

Bases: Protocol

Protocol for trader callbacks to abstract UI/UX concerns.

Source code in src/cc_liquid/callbacks.py
class CCLiquidCallbacks(Protocol):
    """Protocol for trader callbacks to abstract UI/UX concerns."""

    # High-level lifecycle methods
    def ask_confirmation(self, message: str) -> bool:
        """Ask user for confirmation."""
        ...

    def info(self, message: str) -> None:
        """Display info message."""
        ...

    def warn(self, message: str) -> None:
        """Display warning message."""
        ...

    def error(self, message: str) -> None:
        """Display error message."""
        ...

    def on_config_override(self, overrides: list[str]) -> None:
        """Display applied configuration overrides."""
        ...

    # Trade execution progress hooks
    def on_trade_start(self, idx: int, total: int, trade: dict[str, Any]) -> None:
        """Called when a trade execution starts."""
        ...

    def on_trade_fill(
        self, trade: dict[str, Any], fill_data: dict[str, Any], slippage_pct: float
    ) -> None:
        """Called when a trade is filled."""
        ...

    def on_trade_fail(self, trade: dict[str, Any], reason: str) -> None:
        """Called when a trade fails."""
        ...

    def on_batch_complete(self, success: list[dict], failed: list[dict]) -> None:
        """Called when a batch of trades completes."""
        ...

    def show_trade_plan(
        self,
        target_positions: dict,
        trades: list,
        account_value: float,
        leverage: float,
    ) -> None:
        """Display the trade plan before execution."""
        ...

    def show_execution_summary(
        self,
        successful_trades: list[dict],
        all_trades: list[dict],
        target_positions: dict,
        account_value: float,
    ) -> None:
        """Display execution summary after trades complete."""
        ...

ask_confirmation(message)

Ask user for confirmation.

Source code in src/cc_liquid/callbacks.py
def ask_confirmation(self, message: str) -> bool:
    """Ask user for confirmation."""
    ...

error(message)

Display error message.

Source code in src/cc_liquid/callbacks.py
def error(self, message: str) -> None:
    """Display error message."""
    ...

info(message)

Display info message.

Source code in src/cc_liquid/callbacks.py
def info(self, message: str) -> None:
    """Display info message."""
    ...

on_batch_complete(success, failed)

Called when a batch of trades completes.

Source code in src/cc_liquid/callbacks.py
def on_batch_complete(self, success: list[dict], failed: list[dict]) -> None:
    """Called when a batch of trades completes."""
    ...

on_config_override(overrides)

Display applied configuration overrides.

Source code in src/cc_liquid/callbacks.py
def on_config_override(self, overrides: list[str]) -> None:
    """Display applied configuration overrides."""
    ...

on_trade_fail(trade, reason)

Called when a trade fails.

Source code in src/cc_liquid/callbacks.py
def on_trade_fail(self, trade: dict[str, Any], reason: str) -> None:
    """Called when a trade fails."""
    ...

on_trade_fill(trade, fill_data, slippage_pct)

Called when a trade is filled.

Source code in src/cc_liquid/callbacks.py
def on_trade_fill(
    self, trade: dict[str, Any], fill_data: dict[str, Any], slippage_pct: float
) -> None:
    """Called when a trade is filled."""
    ...

on_trade_start(idx, total, trade)

Called when a trade execution starts.

Source code in src/cc_liquid/callbacks.py
def on_trade_start(self, idx: int, total: int, trade: dict[str, Any]) -> None:
    """Called when a trade execution starts."""
    ...

show_execution_summary(successful_trades, all_trades, target_positions, account_value)

Display execution summary after trades complete.

Source code in src/cc_liquid/callbacks.py
def show_execution_summary(
    self,
    successful_trades: list[dict],
    all_trades: list[dict],
    target_positions: dict,
    account_value: float,
) -> None:
    """Display execution summary after trades complete."""
    ...

show_trade_plan(target_positions, trades, account_value, leverage)

Display the trade plan before execution.

Source code in src/cc_liquid/callbacks.py
def show_trade_plan(
    self,
    target_positions: dict,
    trades: list,
    account_value: float,
    leverage: float,
) -> None:
    """Display the trade plan before execution."""
    ...

warn(message)

Display warning message.

Source code in src/cc_liquid/callbacks.py
def warn(self, message: str) -> None:
    """Display warning message."""
    ...