Skip to content

Python API

This package is primarily a CLI. The core modules are documented below for advanced users.

Config

Config dataclass

Manages configuration for the trading bot, loading from a YAML file and environment variables.

Source code in src/cc_liquid/config.py
@dataclass
class Config:
    """
    Manages configuration for the trading bot, loading from a YAML file
    and environment variables.
    """

    # Private Keys and API Credentials (from .env)
    CROWDCENT_API_KEY: str | None = None
    HYPERLIQUID_PRIVATE_KEY: str | None = (
        None  # Private key for signing (owner or approved agent wallet)
    )

    # Environment
    is_testnet: bool = False
    base_url: str = "https://api.hyperliquid.xyz"

    # Profiles (addresses in config; secrets remain in env)
    active_profile: str | None = "default"
    profiles: dict[str, Any] = field(default_factory=dict)

    # Resolved addresses from active profile (owner/vault)
    HYPERLIQUID_ADDRESS: str | None = None
    HYPERLIQUID_VAULT_ADDRESS: str | None = None

    # Nested Configs
    data: DataSourceConfig = field(default_factory=DataSourceConfig)
    portfolio: PortfolioConfig = field(default_factory=PortfolioConfig)
    execution: ExecutionConfig = field(default_factory=ExecutionConfig)

    def __post_init__(self):
        """Load environment variables and YAML config after initialization."""
        self._load_env_vars()
        self._load_yaml_config()
        self._resolve_profile()  # Must come AFTER loading YAML (which loads profiles)
        self._set_base_url()
        self._validate()

    def _load_env_vars(self):
        """Load secrets-only from .env; addresses come from YAML/profiles."""
        load_dotenv()
        self.CROWDCENT_API_KEY = os.getenv("CROWDCENT_API_KEY")
        # Don't load private key here - will be resolved based on profile's signer_env

    def _load_yaml_config(self, config_path: str | None = None):
        """Loads and overrides config from a YAML file."""
        path = config_path or DEFAULT_CONFIG_PATH
        if os.path.exists(path):
            with open(path) as f:
                yaml_config: dict[str, Any] = yaml.safe_load(f) or {}

            for key, value in yaml_config.items():
                if hasattr(self, key):
                    # Handle nested dataclasses
                    if isinstance(value, dict) and is_dataclass(getattr(self, key)):
                        nested_config_obj = getattr(self, key)
                        for nested_key, nested_value in value.items():
                            if hasattr(nested_config_obj, nested_key):
                                # Handle double nested dataclasses
                                if isinstance(nested_value, dict) and is_dataclass(
                                    getattr(nested_config_obj, nested_key)
                                ):
                                    nested_dataclass = getattr(
                                        nested_config_obj, nested_key
                                    )
                                    for deep_key, deep_value in nested_value.items():
                                        if hasattr(nested_dataclass, deep_key):
                                            setattr(
                                                nested_dataclass, deep_key, deep_value
                                            )
                                else:
                                    setattr(nested_config_obj, nested_key, nested_value)
                    else:
                        # Direct assignment for non-dataclass fields (like profiles dict)
                        setattr(self, key, value)

    def _set_base_url(self):
        """Sets the base URL based on the is_testnet flag."""
        if self.is_testnet:
            self.base_url = "https://api.hyperliquid-testnet.xyz"

    def _resolve_profile(self):
        """Resolve owner/vault addresses and signer key from the active profile."""
        # If no profiles defined, skip resolution
        if not self.profiles:
            return

        active = self.active_profile or "default"
        profile = self.profiles.get(active, {})

        # Extract owner and vault from profile
        owner = profile.get("owner")
        vault = profile.get("vault")

        # Set addresses (owner is required, vault is optional)
        self.HYPERLIQUID_ADDRESS = owner
        self.HYPERLIQUID_VAULT_ADDRESS = vault

        # Resolve signer key from environment based on profile's signer_env
        signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        self.HYPERLIQUID_PRIVATE_KEY = os.getenv(signer_env)

        # Fallback to default env var if custom signer_env not found
        if not self.HYPERLIQUID_PRIVATE_KEY and signer_env != "HYPERLIQUID_PRIVATE_KEY":
            self.HYPERLIQUID_PRIVATE_KEY = os.getenv("HYPERLIQUID_PRIVATE_KEY")

    def refresh_runtime(self):
        """Refresh runtime configuration after changes (e.g., CLI overrides)."""
        self._set_base_url()
        self._resolve_profile()
        self._validate()

    def _validate(self):
        """Validate that required configuration is present.

        Note: This is lenient during initial module load to allow CLI commands
        like 'profile list' to work even without complete setup.
        """
        # Check if active profile exists
        if self.profiles and self.active_profile:
            if self.active_profile not in self.profiles:
                available = ", ".join(sorted(self.profiles.keys()))
                raise ValueError(
                    f"Active profile '{self.active_profile}' not found. Available profiles: {available}"
                )

        # Don't validate addresses/keys during module import - let individual commands handle it
        # This allows 'profile list', 'config', etc to work without full setup

    def validate_for_trading(self):
        """Strict validation for trading operations.

        Call this before any trading operations to ensure all required config is present.
        """
        # Validate we have required addresses from profile
        if not self.HYPERLIQUID_ADDRESS and not self.HYPERLIQUID_VAULT_ADDRESS:
            raise ValueError(
                "Profile must specify 'owner' or 'vault' address in cc-liquid-config.yaml"
            )

        # Validate we have a private key
        if not self.HYPERLIQUID_PRIVATE_KEY:
            # Better error message showing which env var is expected
            signer_env = "HYPERLIQUID_PRIVATE_KEY"
            if self.profiles and self.active_profile:
                profile = self.profiles.get(self.active_profile, {})
                signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
            raise ValueError(
                f"Private key not found. Set '{signer_env}' in your .env file."
            )

    def to_dict(self) -> dict[str, Any]:
        """Return a dictionary representation of the config."""
        portfolio_dict = self.portfolio.__dict__.copy()
        # Convert nested dataclass to dict
        if hasattr(self.portfolio, "rebalancing"):
            portfolio_dict["rebalancing"] = self.portfolio.rebalancing.__dict__

        # Profile summary (non-secret)
        active_profile = self.active_profile
        prof = self.profiles.get(active_profile) if self.profiles else {}
        signer_env_name = (
            prof.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
            if prof
            else "HYPERLIQUID_PRIVATE_KEY"
        )
        profile_dict = {
            "active": active_profile,
            "owner": self.HYPERLIQUID_ADDRESS,
            "vault": self.HYPERLIQUID_VAULT_ADDRESS,
            "signer_env": signer_env_name,
        }

        return {
            "is_testnet": self.is_testnet,
            "profile": profile_dict,
            "data": self.data.__dict__,
            "portfolio": portfolio_dict,
            "execution": self.execution.__dict__,
        }

__post_init__()

Load environment variables and YAML config after initialization.

Source code in src/cc_liquid/config.py
def __post_init__(self):
    """Load environment variables and YAML config after initialization."""
    self._load_env_vars()
    self._load_yaml_config()
    self._resolve_profile()  # Must come AFTER loading YAML (which loads profiles)
    self._set_base_url()
    self._validate()

refresh_runtime()

Refresh runtime configuration after changes (e.g., CLI overrides).

Source code in src/cc_liquid/config.py
def refresh_runtime(self):
    """Refresh runtime configuration after changes (e.g., CLI overrides)."""
    self._set_base_url()
    self._resolve_profile()
    self._validate()

to_dict()

Return a dictionary representation of the config.

Source code in src/cc_liquid/config.py
def to_dict(self) -> dict[str, Any]:
    """Return a dictionary representation of the config."""
    portfolio_dict = self.portfolio.__dict__.copy()
    # Convert nested dataclass to dict
    if hasattr(self.portfolio, "rebalancing"):
        portfolio_dict["rebalancing"] = self.portfolio.rebalancing.__dict__

    # Profile summary (non-secret)
    active_profile = self.active_profile
    prof = self.profiles.get(active_profile) if self.profiles else {}
    signer_env_name = (
        prof.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        if prof
        else "HYPERLIQUID_PRIVATE_KEY"
    )
    profile_dict = {
        "active": active_profile,
        "owner": self.HYPERLIQUID_ADDRESS,
        "vault": self.HYPERLIQUID_VAULT_ADDRESS,
        "signer_env": signer_env_name,
    }

    return {
        "is_testnet": self.is_testnet,
        "profile": profile_dict,
        "data": self.data.__dict__,
        "portfolio": portfolio_dict,
        "execution": self.execution.__dict__,
    }

validate_for_trading()

Strict validation for trading operations.

Call this before any trading operations to ensure all required config is present.

Source code in src/cc_liquid/config.py
def validate_for_trading(self):
    """Strict validation for trading operations.

    Call this before any trading operations to ensure all required config is present.
    """
    # Validate we have required addresses from profile
    if not self.HYPERLIQUID_ADDRESS and not self.HYPERLIQUID_VAULT_ADDRESS:
        raise ValueError(
            "Profile must specify 'owner' or 'vault' address in cc-liquid-config.yaml"
        )

    # Validate we have a private key
    if not self.HYPERLIQUID_PRIVATE_KEY:
        # Better error message showing which env var is expected
        signer_env = "HYPERLIQUID_PRIVATE_KEY"
        if self.profiles and self.active_profile:
            profile = self.profiles.get(self.active_profile, {})
            signer_env = profile.get("signer_env", "HYPERLIQUID_PRIVATE_KEY")
        raise ValueError(
            f"Private key not found. Set '{signer_env}' in your .env file."
        )

Data loading

DataLoader

Factory for creating data sources.

Source code in src/cc_liquid/data_loader.py
class DataLoader:
    """Factory for creating data sources."""

    @staticmethod
    def from_file(path: str, date_col: str, id_col: str, pred_col: str) -> pl.DataFrame:
        """Create a file data source and load data."""
        return FileDataSource(
            path,
            date_column=date_col,
            asset_id_column=id_col,
            prediction_column=pred_col,
        ).load()

    @staticmethod
    def from_dataframe(
        df: pl.DataFrame, date_col: str, id_col: str, pred_col: str
    ) -> pl.DataFrame:
        """Create a DataFrame data source and load data."""
        return DataFrameDataSource(
            df,
            date_column=date_col,
            asset_id_column=id_col,
            prediction_column=pred_col,
        ).load()

    @staticmethod
    def from_crowdcent_api(
        api_key: str | None = None,
        challenge_slug: str = "hyperliquid-ranking",
        download_path: str | None = None,
        date_col: str = "release_date",
        id_col: str = "id",
        pred_col: str = "pred_10d",
    ) -> pl.DataFrame:
        """
        Download and load the CrowdCent meta model.

        Args:
            api_key: CrowdCent API key (if None, will try to load from env)
            challenge_slug: The challenge to download data for
            download_path: Optional path to save the downloaded file
            date_col: Date column name in the meta model
            id_col: Asset ID column name in the meta model
            pred_col: Prediction column name to use from the meta model

        Returns:
            Polars DataFrame with original column names
        """
        from crowdcent_challenge import ChallengeClient

        if api_key is None:
            import os

            api_key = os.getenv("CROWDCENT_API_KEY")
            if not api_key:
                raise ValueError("CROWDCENT_API_KEY not found in environment variables")

        client = ChallengeClient(challenge_slug=challenge_slug, api_key=api_key)

        if download_path is None:
            download_path = "predictions.parquet"

        client.download_meta_model(download_path)

        return DataLoader.from_file(
            path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
        )

    @staticmethod
    def from_numerai_api(
        download_path: str | None = None,
        date_col: str = "date",
        id_col: str = "symbol",
        pred_col: str = "meta_model",
    ) -> pl.DataFrame:
        """
        Download and load the Numerai crypto meta model.

        Args:
            download_path: Optional path to save the downloaded file
            date_col: Date column name in the meta model
            id_col: Asset ID/symbol column name in the meta model
            pred_col: Prediction column name to use from the meta model

        Returns:
            Polars DataFrame with original column names
        """
        try:
            from numerapi import CryptoAPI
        except ImportError:
            raise ImportError(
                "numerapi is required. Install with: uv add cc-liquid[numerai]"
            )

        api = CryptoAPI()

        if download_path is None:
            download_path = "predictions.parquet"

        api.download_dataset("v1.0/historical_meta_models.parquet", download_path)

        return DataLoader.from_file(
            path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
        )

from_crowdcent_api(api_key=None, challenge_slug='hyperliquid-ranking', download_path=None, date_col='release_date', id_col='id', pred_col='pred_10d') staticmethod

Download and load the CrowdCent meta model.

Parameters:

Name Type Description Default
api_key str | None

CrowdCent API key (if None, will try to load from env)

None
challenge_slug str

The challenge to download data for

'hyperliquid-ranking'
download_path str | None

Optional path to save the downloaded file

None
date_col str

Date column name in the meta model

'release_date'
id_col str

Asset ID column name in the meta model

'id'
pred_col str

Prediction column name to use from the meta model

'pred_10d'

Returns:

Type Description
DataFrame

Polars DataFrame with original column names

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_crowdcent_api(
    api_key: str | None = None,
    challenge_slug: str = "hyperliquid-ranking",
    download_path: str | None = None,
    date_col: str = "release_date",
    id_col: str = "id",
    pred_col: str = "pred_10d",
) -> pl.DataFrame:
    """
    Download and load the CrowdCent meta model.

    Args:
        api_key: CrowdCent API key (if None, will try to load from env)
        challenge_slug: The challenge to download data for
        download_path: Optional path to save the downloaded file
        date_col: Date column name in the meta model
        id_col: Asset ID column name in the meta model
        pred_col: Prediction column name to use from the meta model

    Returns:
        Polars DataFrame with original column names
    """
    from crowdcent_challenge import ChallengeClient

    if api_key is None:
        import os

        api_key = os.getenv("CROWDCENT_API_KEY")
        if not api_key:
            raise ValueError("CROWDCENT_API_KEY not found in environment variables")

    client = ChallengeClient(challenge_slug=challenge_slug, api_key=api_key)

    if download_path is None:
        download_path = "predictions.parquet"

    client.download_meta_model(download_path)

    return DataLoader.from_file(
        path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
    )

from_dataframe(df, date_col, id_col, pred_col) staticmethod

Create a DataFrame data source and load data.

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_dataframe(
    df: pl.DataFrame, date_col: str, id_col: str, pred_col: str
) -> pl.DataFrame:
    """Create a DataFrame data source and load data."""
    return DataFrameDataSource(
        df,
        date_column=date_col,
        asset_id_column=id_col,
        prediction_column=pred_col,
    ).load()

from_file(path, date_col, id_col, pred_col) staticmethod

Create a file data source and load data.

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_file(path: str, date_col: str, id_col: str, pred_col: str) -> pl.DataFrame:
    """Create a file data source and load data."""
    return FileDataSource(
        path,
        date_column=date_col,
        asset_id_column=id_col,
        prediction_column=pred_col,
    ).load()

from_numerai_api(download_path=None, date_col='date', id_col='symbol', pred_col='meta_model') staticmethod

Download and load the Numerai crypto meta model.

Parameters:

Name Type Description Default
download_path str | None

Optional path to save the downloaded file

None
date_col str

Date column name in the meta model

'date'
id_col str

Asset ID/symbol column name in the meta model

'symbol'
pred_col str

Prediction column name to use from the meta model

'meta_model'

Returns:

Type Description
DataFrame

Polars DataFrame with original column names

Source code in src/cc_liquid/data_loader.py
@staticmethod
def from_numerai_api(
    download_path: str | None = None,
    date_col: str = "date",
    id_col: str = "symbol",
    pred_col: str = "meta_model",
) -> pl.DataFrame:
    """
    Download and load the Numerai crypto meta model.

    Args:
        download_path: Optional path to save the downloaded file
        date_col: Date column name in the meta model
        id_col: Asset ID/symbol column name in the meta model
        pred_col: Prediction column name to use from the meta model

    Returns:
        Polars DataFrame with original column names
    """
    try:
        from numerapi import CryptoAPI
    except ImportError:
        raise ImportError(
            "numerapi is required. Install with: uv add cc-liquid[numerai]"
        )

    api = CryptoAPI()

    if download_path is None:
        download_path = "predictions.parquet"

    api.download_dataset("v1.0/historical_meta_models.parquet", download_path)

    return DataLoader.from_file(
        path=download_path, date_col=date_col, id_col=id_col, pred_col=pred_col
    )

Trading logic

CCLiquid

Handles all interactions with the Hyperliquid exchange.

Source code in src/cc_liquid/trader.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
class CCLiquid:
    """
    Handles all interactions with the Hyperliquid exchange.
    """

    def __init__(
        self,
        config: Config,
        callbacks: CCLiquidCallbacks | None = None,
        skip_ws: bool = True,
    ):
        self.config = config
        self.callbacks = callbacks or NoOpCallbacks()

        # Validate config for trading operations
        self.config.validate_for_trading()

        self.account: LocalAccount = self._get_account()
        self.exchange = Exchange(
            self.account,
            self.config.base_url,
            vault_address=(self.config.HYPERLIQUID_VAULT_ADDRESS or None),
            account_address=self.config.HYPERLIQUID_ADDRESS,
        )
        self.info = Info(self.config.base_url, skip_ws=skip_ws)
        self.logger = logging.getLogger(__name__)
        # Lazy-loaded map of coin -> szDecimals from Info.meta()["universe"].
        # Perps only: Hyperliquid perps use max 6 decimals for price rules.
        self._coin_to_sz_decimals: dict[str, int] | None = None

    def _get_account(self) -> LocalAccount:
        """Creates an eth_account LocalAccount object from the private key."""
        from eth_account import Account

        return Account.from_key(self.config.HYPERLIQUID_PRIVATE_KEY)

    def get_user_state(self) -> dict[str, Any]:
        """Retrieves the current state of the user's account."""
        # Always query Info using the portfolio owner: vault (if set) or master address.
        # Never use the agent/signer address for Info, as it has no balances.
        owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
        if not owner:
            raise ValueError(
                "Missing portfolio owner. Set HYPERLIQUID_VAULT_ADDRESS or HYPERLIQUID_ADDRESS."
            )
        return self.info.user_state(owner)

    def get_positions(self) -> dict[str, Any]:
        """Retrieves the user's open positions as a dict."""
        user_state = self.get_user_state()
        positions = {}
        for position_data in user_state.get("assetPositions", []):
            position = position_data.get("position", {})
            if float(position.get("szi", 0)) != 0:
                positions[position["coin"]] = position
        return positions

    def get_account_value(self) -> float:
        """Retrieves the total account value in USD."""
        user_state = self.get_user_state()
        return float(user_state["marginSummary"]["accountValue"])

    def get_portfolio_info(self) -> PortfolioInfo:
        """Get complete portfolio information as structured data."""
        try:
            user_state = self.get_user_state()
        except Exception as e:
            self.logger.warning(f"Could not get user state: {e}")
            # Return empty portfolio if we can't connect
            return PortfolioInfo(
                account=AccountInfo(
                    account_value=0,
                    total_position_value=0,
                    margin_used=0,
                    free_collateral=0,
                    cash_balance=0,
                    withdrawable=0,
                    current_leverage=0,
                ),
                positions=[],
            )

        margin_summary = user_state.get("marginSummary", {}) if user_state else {}
        all_mids = self.info.all_mids() if user_state else {}

        # Build account info
        account_info = AccountInfo(
            account_value=float(margin_summary.get("accountValue", 0)),
            total_position_value=float(margin_summary.get("totalNtlPos", 0)),
            margin_used=float(margin_summary.get("totalMarginUsed", 0)),
            free_collateral=float(margin_summary.get("accountValue", 0))
            - float(margin_summary.get("totalMarginUsed", 0)),
            cash_balance=float(margin_summary.get("totalRawUsd", 0)),
            withdrawable=float(user_state.get("withdrawable", 0)),
            current_leverage=float(margin_summary.get("totalNtlPos", 0))
            / float(margin_summary.get("accountValue", 1))
            if float(margin_summary.get("accountValue", 0)) > 0
            else 0,
            raw_margin_summary=margin_summary,
        )

        # Add cross margin info if available
        cross_margin = user_state.get("crossMarginSummary")
        if cross_margin:
            account_info.cross_leverage = (
                float(cross_margin.get("accountValue", 0))
                / float(margin_summary.get("accountValue", 1))
                if float(margin_summary.get("accountValue", 0)) > 0
                else 0
            )
            account_info.cross_margin_used = float(
                cross_margin.get("totalMarginUsed", 0)
            )
            account_info.cross_maintenance_margin = float(
                cross_margin.get("totalMaintenanceMargin", 0)
            )
            account_info.raw_cross_margin_summary = cross_margin

        # Build positions list
        positions = []
        for position_data in user_state.get("assetPositions", []):
            pos = position_data.get("position", {})
            size = float(pos.get("szi", 0))

            if size == 0:
                continue

            coin = pos["coin"]
            entry_px = float(pos.get("entryPx", 0))
            mark_px = float(all_mids.get(coin, entry_px))
            position_value = abs(size * mark_px)

            # Calculate unrealized PnL
            if size > 0:
                unrealized_pnl = (mark_px - entry_px) * size
                side = "LONG"
            else:
                unrealized_pnl = (entry_px - mark_px) * abs(size)
                side = "SHORT"

            return_pct = (
                (unrealized_pnl / (abs(size) * entry_px) * 100) if entry_px > 0 else 0
            )

            positions.append(
                Position(
                    coin=coin,
                    side=side,
                    size=abs(size),
                    entry_price=entry_px,
                    mark_price=mark_px,
                    value=position_value,
                    unrealized_pnl=unrealized_pnl,
                    return_pct=return_pct,
                    liquidation_price=float(pos["liquidationPx"])
                    if "liquidationPx" in pos and pos["liquidationPx"] is not None
                    else None,
                    margin_used=float(pos["marginUsed"])
                    if "marginUsed" in pos and pos["marginUsed"] is not None
                    else None,
                )
            )

        return PortfolioInfo(account=account_info, positions=positions)

    # --- Rounding helpers (Perps only) ---
    def _load_sz_decimals_map(self, force_refresh: bool = False) -> dict[str, int]:
        """Load and cache coin -> szDecimals from exchange meta.

        Per Hyperliquid rounding guidance for orders, sizes must be rounded to a
        coin-specific number of decimals (szDecimals). We cache from `info.meta()`
        and refresh on demand.
        """
        if self._coin_to_sz_decimals is None or force_refresh:
            try:
                universe = self.info.meta().get("universe", [])
                self._coin_to_sz_decimals = {
                    asset.get("name"): int(asset.get("szDecimals", 2))
                    for asset in universe
                    if asset.get("name") and not asset.get("isDelisted", False)
                }
            except Exception as e:
                self.logger.warning(f"Failed to load szDecimals map: {e}")
                self._coin_to_sz_decimals = {}
        return self._coin_to_sz_decimals

    def _get_sz_decimals(self, coin: str) -> int | None:
        """Return szDecimals for the given coin, refreshing meta once if needed."""
        sz_map = self._load_sz_decimals_map()
        if coin not in sz_map:
            sz_map = self._load_sz_decimals_map(force_refresh=True)
        return sz_map.get(coin)

    def _round_size(self, coin: str, size: float) -> tuple[float, int] | None:
        """Round size per coin's szDecimals.

        Returns (rounded_size, sz_decimals) or None if szDecimals are unknown.
        """
        sz_decimals = self._get_sz_decimals(coin)
        if sz_decimals is None:
            return None
        return round(size, sz_decimals), sz_decimals

    def _round_price_perp(self, coin: str, px: float) -> float:
        """Round price according to Hyperliquid perp rules (not used for market orders).

        Rules (per Hyperliquid):
        - If px > 100_000: round to integer.
        - Else: round to 5 significant figures and at most (6 - szDecimals) decimals.
        Reference: Hyperliquid SDK example rounding: see rounding.py
        """
        if px > 100_000:
            return round(px)
        sz_decimals = self._get_sz_decimals(coin)
        # If unknown, still limit to 5 significant figures as a safe default.
        if sz_decimals is None:
            return float(f"{px:.5g}")
        max_decimals = 6  # perps
        return round(float(f"{px:.5g}"), max_decimals - sz_decimals)

    def plan_rebalance(self, predictions: pl.DataFrame | None = None) -> dict:
        """Compute a rebalancing plan without executing orders."""
        # Load predictions if not provided
        if predictions is None:
            self.callbacks.info("Loading predictions...")
            predictions = self._load_predictions()

            if predictions is None or predictions.is_empty():
                self.callbacks.error("No predictions available, cannot rebalance")
                return {
                    "target_positions": {},
                    "trades": [],
                    "skipped_trades": [],
                    "account_value": 0.0,
                    "leverage": self.config.portfolio.target_leverage,
                }

            # Display prediction info
            unique_assets = predictions[self.config.data.asset_id_column].n_unique()
            latest_data = predictions[self.config.data.date_column].max()
            self.callbacks.info(
                f"Loaded predictions for {unique_assets} assets (latest: {latest_data})"
            )

        # Asset Selection, Position Calculation, and Trade Generation
        target_positions = self._get_target_positions(predictions)
        current_positions = self.get_positions()
        trades, skipped_trades = self._calculate_trades(
            target_positions, current_positions
        )

        # Build plan (including skipped trades)
        account_value = self.get_account_value()
        leverage = self.config.portfolio.target_leverage
        return {
            "target_positions": target_positions,
            "trades": trades,
            "skipped_trades": skipped_trades,
            "account_value": account_value,
            "leverage": leverage,
        }

    def execute_plan(self, plan: dict) -> dict:
        """Execute a precomputed plan, returning structured results."""
        trades: list[dict] = plan.get("trades", [])
        if not trades:
            # Nothing to do
            return {"successful_trades": [], "all_trades": trades}

        # Prioritize leverage reduction: execute closes/reductions (and flips) before opens
        trades = self._sort_trades_for_leverage_reduction(trades)

        self.callbacks.info(f"Starting execution of {len(trades)} trades...")
        successful_trades = self._execute_trades(trades)
        return {"successful_trades": successful_trades, "all_trades": trades}

    def plan_close_all_positions(self, *, force: bool = False) -> dict:
        """Plan to close all open positions (return to cash) without executing orders."""
        current_positions = self.get_positions()

        if not current_positions:
            self.callbacks.info("No open positions to close.")
            return {
                "target_positions": {},
                "trades": [],
                "skipped_trades": [],
                "account_value": self.get_account_value(),
                "leverage": self.config.portfolio.target_leverage,
            }

        self.callbacks.info("Closing all positions to return to cash...")

        # Create target positions of 0 for all current positions
        target_positions = {coin: 0 for coin in current_positions.keys()}
        trades, skipped_trades = self._calculate_trades(
            target_positions, current_positions, force=force
        )

        account_value = self.get_account_value()
        leverage = self.config.portfolio.target_leverage
        return {
            "target_positions": target_positions,
            "trades": trades,
            "skipped_trades": skipped_trades,
            "account_value": account_value,
            "leverage": leverage,
        }

    def _get_target_positions(self, predictions: pl.DataFrame) -> dict[str, float]:
        """Calculates the target notional value for each position using equal weighting."""
        latest_predictions = self._get_latest_predictions(predictions)
        long_assets = self._select_assets(latest_predictions, descending=True)
        short_assets = self._select_assets(latest_predictions, descending=False)

        account_value = self.get_account_value()
        total_positions = (
            self.config.portfolio.num_long + self.config.portfolio.num_short
        )

        # When using leverage, we can allocate more notional value per position
        # This effectively creates leveraged positions by sizing them larger than account equity
        target_leverage = self.config.portfolio.target_leverage
        position_value = (account_value * target_leverage) / total_positions

        # Log the calculation for transparency
        self.callbacks.info(
            f"Position sizing: ${account_value:.2f} × {target_leverage}x leverage "
            f{total_positions} positions = ${position_value:.2f} per position"
        )

        # Warn if position sizes will be too small
        min_position_value = self.config.execution.min_trade_value
        if position_value < min_position_value:
            msg = "Warning: Position size too small!\n"
            msg += f"   Account value: ${account_value:.2f}\n"
            if target_leverage > 1.0:
                msg += f"   With {target_leverage}x leverage: ${account_value * target_leverage:.2f} total exposure\n"
            msg += f"   Divided by {total_positions} positions = ${position_value:.2f} per position\n"
            msg += f"   Minimum required: ${min_position_value}\n\n"
            msg += "   Solutions:\n"
            msg += f"   • Reduce positions (currently {self.config.portfolio.num_long} long + {self.config.portfolio.num_short} short)\n"
            msg += "   • Increase account value\n"
            if target_leverage < 5.0:
                msg += f"   • Increase leverage (currently {target_leverage}x)\n"
            self.callbacks.warn(msg)

        target_positions = {}
        for asset in long_assets:
            target_positions[asset] = position_value
        for asset in short_assets:
            target_positions[asset] = -position_value

        return target_positions

    def _get_latest_predictions(self, predictions: pl.DataFrame) -> pl.DataFrame:
        """Filters for the latest predictions for each asset by date."""
        return (
            predictions.sort(self.config.data.date_column, descending=True)
            .group_by(self.config.data.asset_id_column)
            .first()
        )

    def _select_assets(self, predictions: pl.DataFrame, descending: bool) -> list[str]:
        """Selects top or bottom N assets based on prediction score,
        filtered to only assets currently listed on Hyperliquid."""

        # Use listed assets from /info meta
        universe = self.info.meta()["universe"]
        available_assets = {
            p["name"] for p in universe if not p.get("isDelisted", False)
        }

        # Filter predictions to only tradeable assets
        tradeable_predictions = predictions.filter(
            pl.col(self.config.data.asset_id_column).is_in(available_assets)
        )

        if tradeable_predictions.height == 0:
            self.logger.warning("No predictions match Hyperliquid tradeable assets!")
            self.callbacks.error(
                "Error: No predictions match Hyperliquid tradeable assets!"
            )
            self.callbacks.info(
                f"Available on Hyperliquid: {sorted(list(available_assets)[:10])}{'...' if len(available_assets) > 10 else ''}"
            )
            prediction_assets = (
                predictions[self.config.data.asset_id_column].unique().to_list()
            )
            self.callbacks.info(
                f"In predictions: {sorted(prediction_assets[:10])}{'...' if len(prediction_assets) > 10 else ''}"
            )
            return []

        # Select from filtered set
        num_assets = (
            self.config.portfolio.num_long
            if descending
            else self.config.portfolio.num_short
        )

        # Warn if we don't have enough tradeable assets
        if tradeable_predictions.height < num_assets:
            self.callbacks.warn(
                f"Warning: Only {tradeable_predictions.height} tradeable assets available, requested {num_assets}"
            )
            self.callbacks.info(
                f"Will use all {tradeable_predictions.height} available assets"
            )

        selected = (
            tradeable_predictions.sort(
                self.config.data.prediction_column, descending=descending
            )
            .head(num_assets)[self.config.data.asset_id_column]
            .to_list()
        )

        return selected

    def _calculate_trades(
        self,
        target_positions: dict[str, float],
        current_positions: dict[str, Any],
        *,
        force: bool = False,
    ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
        """Calculates the trades required to reach the target portfolio using market orders.

        Returns:
            (executable_trades, skipped_trades) - trades that can be executed and those below minimum
        """
        trades = []
        skipped_trades = []  # Track trades we can't execute
        all_mids = self.info.all_mids()

        all_assets = set(target_positions.keys()) | set(current_positions.keys())

        for asset in all_assets:
            target_value = target_positions.get(asset, 0)

            # Get current position details
            current_position = current_positions.get(asset, {})
            current_size = float(current_position.get("szi", 0))

            # Ensure we have a mid price; otherwise skip
            if asset not in all_mids:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": "No mid price available",
                    }
                )
                continue

            price = float(all_mids[asset])

            # Calculate current value with proper sign
            # szi is positive for long, negative for short
            current_value = current_size * price

            # Calculate the value delta we need to achieve
            delta_value = target_value - current_value

            # Determine trade direction
            # If delta_value > 0, we need to buy (increase position or reduce short)
            # If delta_value < 0, we need to sell (decrease position or increase short)
            is_buy = delta_value > 0
            size = abs(delta_value) / price

            # Round the size using szDecimals from meta (perps only)
            coin = asset
            rounded = self._round_size(coin, size)
            if rounded is None:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": "Unknown szDecimals (meta)",
                    }
                )
                continue
            size, sz_decimals = rounded

            # If rounding collapses to zero, skip
            if size == 0:
                skipped_trades.append(
                    {
                        "coin": asset,
                        "target_value": target_value,
                        "skipped": True,
                        "skip_reason": f"Rounded size is 0 at {sz_decimals} dp",
                    }
                )
                continue

            # Check if trade is below minimum value threshold
            min_trade_value = self.config.execution.min_trade_value
            # Classify the trade type for clearer downstream handling
            # Types: open, close, reduce, increase, flip
            trade_type: str
            if current_value == 0:
                trade_type = "open" if target_value != 0 else "increase"
            elif target_value == 0:
                trade_type = "close"
            else:
                same_sign = (current_value > 0 and target_value > 0) or (
                    current_value < 0 and target_value < 0
                )
                if same_sign:
                    trade_type = (
                        "reduce"
                        if abs(target_value) < abs(current_value)
                        else "increase"
                    )
                else:
                    trade_type = "flip"

            trade_data = {
                "coin": asset,
                "is_buy": is_buy,
                "sz": size,
                "price": price,
                "current_value": current_value,
                "target_value": target_value,
                "delta_value": delta_value,
                "type": trade_type,
            }

            # Re-evaluate min notional AFTER rounding size
            if abs(size * price) < min_trade_value:
                # Below minimum. If not forcing or not a pure close-to-zero scenario, skip.
                if not force or target_value != 0:
                    trade_data["skipped"] = True
                    trade_data["skip_reason"] = f"Below minimum ${min_trade_value}"
                    skipped_trades.append(trade_data)
                else:
                    forced, reason = self._compose_force_close_trades(
                        asset, price, current_value, min_trade_value
                    )
                    if forced is None:
                        skipped_trades.append(
                            {
                                "coin": asset,
                                "target_value": target_value,
                                "skipped": True,
                                "skip_reason": reason
                                or "Force close composition failed",
                            }
                        )
                    else:
                        trades.extend(forced)
            else:
                # Add to executable trades
                trades.append(trade_data)

        return trades, skipped_trades

    def _sort_trades_for_leverage_reduction(
        self, trades: list[dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Return trades ordered to reduce leverage first using explicit trade types.

        Priority: close (0), reduce/flip (1), increase (2), open (3). Stable ordering within groups.
        """
        priority = {"close": 0, "reduce": 1, "flip": 1, "increase": 2, "open": 3}

        def sort_key(t: dict[str, Any]):
            # Forced close chains must execute in sequence: increase (0) then close (1)
            if t.get("force"):
                return (0, t.get("force_id", ""), t.get("force_seq", 0))
            return (1, priority.get(t.get("type", "increase"), 2), 0)

        return sorted(trades, key=sort_key)

    def _compose_force_close_trades(
        self, coin: str, price: float, current_value: float, min_trade_value: float
    ) -> tuple[list[dict[str, Any]] | None, str | None]:
        """Compose the two-step forced close for sub-minimum closes.
        i.e. if we have a position of less than $10, we want to close it to $0, we need to increase the position
        to at least $10, then close it to $0.

        Returns (trades, None) on success or (None, reason) on failure.
        """
        rounded_up = self._round_size_up_to_min_notional(coin, min_trade_value, price)
        if rounded_up is None:
            return None, "Unknown szDecimals (meta)"
        min_increase_sz, _ = rounded_up

        increase_is_buy = current_value > 0
        force_id = f"force_close:{coin}"

        step1 = {
            "coin": coin,
            "is_buy": increase_is_buy,
            "sz": min_increase_sz,
            "price": price,
            "current_value": current_value,
            "target_value": current_value
            + (
                min_increase_sz * price
                if current_value >= 0
                else -min_increase_sz * price
            ),
            "delta_value": min_increase_sz * price
            if increase_is_buy
            else -(min_increase_sz * price),
            "type": "increase",
            "force": True,
            "force_id": force_id,
            "force_seq": 0,
        }

        total_notional_to_close = abs(current_value) + (min_increase_sz * price)
        close_is_buy = not increase_is_buy
        close_sz_rounded = self._round_size(coin, total_notional_to_close / price)
        if close_sz_rounded is None:
            return None, "Unknown szDecimals (meta)"
        close_sz, _ = close_sz_rounded

        step2 = {
            "coin": coin,
            "is_buy": close_is_buy,
            "sz": close_sz,
            "price": price,
            "current_value": current_value
            + (
                min_increase_sz * price
                if increase_is_buy
                else -(min_increase_sz * price)
            ),
            "target_value": 0,
            "delta_value": total_notional_to_close
            if close_is_buy
            else -total_notional_to_close,
            "type": "close",
            "force": True,
            "force_id": force_id,
            "force_seq": 1,
        }

        # Ensure both meet minimum notional
        if (step1["sz"] * price) < min_trade_value or (
            step2["sz"] * price
        ) < min_trade_value:
            return (
                None,
                f"Below minimum even after force composition (${min_trade_value})",
            )

        return [step1, step2], None

    def _round_size_up_to_min_notional(
        self, coin: str, target_notional: float, price: float
    ) -> tuple[float, int] | None:
        """Return (size, decimals) such that size*price >= target_notional after rounding to szDecimals.

        Rounds up to the nearest step defined by szDecimals to satisfy the notional constraint.
        """
        sz_decimals = self._get_sz_decimals(coin)
        if sz_decimals is None:
            return None
        raw_size = target_notional / price if price > 0 else 0
        if raw_size <= 0:
            return (0.0, sz_decimals)
        step = 10 ** (-sz_decimals)
        # Avoid floating imprecision by working in integer steps
        steps_needed = math.ceil(raw_size / step)
        rounded_up_size = steps_needed * step
        # Round to the allowed decimals to avoid long floats
        rounded_up_size = round(rounded_up_size, sz_decimals)
        return rounded_up_size, sz_decimals

    def _execute_trades(self, trades: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Executes a list of trades using the SDK's market_open for robustness."""
        if not trades:
            return []

        successful_trades = []
        failed_trades = []

        # Show progress during execution
        self.callbacks.info(f"Executing {len(trades)} trades...")

        for i, trade in enumerate(trades, 1):
            # Notify callback of trade start
            self.callbacks.on_trade_start(i, len(trades), trade)

            try:
                self.logger.debug(f"Executing trade: {trade}")
                # Pass vault address if trading on behalf of a vault/subaccount.
                result = self.exchange.market_open(
                    name=trade["coin"],
                    is_buy=trade["is_buy"],
                    sz=trade["sz"],  # Already a float
                    slippage=self.config.execution.slippage_tolerance,
                )

                statuses = (
                    result.get("response", {}).get("data", {}).get("statuses", [])
                )

                # Check for filled orders
                filled_data = None
                for status in statuses:
                    if "filled" in status:
                        filled_data = status["filled"]
                        break

                if filled_data:
                    # Extract fill details
                    float(filled_data.get("totalSz", trade["sz"]))
                    avg_px = float(filled_data.get("avgPx", trade["price"]))

                    # Calculate slippage
                    if trade["is_buy"]:
                        slippage_pct = (
                            (avg_px - trade["price"]) / trade["price"]
                        ) * 100
                    else:
                        slippage_pct = (
                            (trade["price"] - avg_px) / trade["price"]
                        ) * 100

                    self.callbacks.on_trade_fill(trade, filled_data, slippage_pct)

                    successful_trades.append(
                        {
                            **trade,
                            "fill_data": filled_data,
                            "slippage_pct": slippage_pct,
                        }
                    )
                else:
                    # Handle errors or unfilled orders
                    errors = [s.get("error") for s in statuses if "error" in s]
                    error_msg = errors[0] if errors else "Order not filled"

                    self.callbacks.on_trade_fail(trade, error_msg)
                    failed_trades.append(trade)

            except Exception as e:
                self.callbacks.on_trade_fail(trade, str(e))
                self.logger.error(f"Error executing trade for {trade['coin']}: {e}")
                failed_trades.append(trade)

        # Notify callback of batch completion
        self.callbacks.on_batch_complete(successful_trades, failed_trades)

        return successful_trades

    def load_state(self) -> datetime | None:
        """Public wrapper to load last rebalance timestamp."""
        return self._load_state()

    def save_state(self, last_rebalance_date: datetime) -> None:
        """Public wrapper to persist last rebalance timestamp."""
        self._save_state(last_rebalance_date)

    def compute_next_rebalance_time(
        self, last_rebalance_date: datetime | None, now: datetime | None = None
    ) -> datetime:
        """Compute the next scheduled rebalance timestamp in UTC.

        Rules:
        - If this is the first run (no last date): schedule for today at configured time; if
          already past that time, return "now" to indicate it is due immediately.
        - Otherwise: schedule exactly every_n_days after the last rebalance date, at the
          configured time.
        """
        cfg = self.config.portfolio.rebalancing
        now_utc = now or datetime.now(timezone.utc)

        hour, minute = map(int, cfg.at_time.split(":"))
        rebalance_time = time(hour=hour, minute=minute)

        if last_rebalance_date is None:
            today_at = datetime.combine(
                now_utc.date(), rebalance_time, tzinfo=timezone.utc
            )
            return today_at if now_utc < today_at else now_utc

        next_date = last_rebalance_date.date() + timedelta(days=cfg.every_n_days)
        return datetime.combine(next_date, rebalance_time, tzinfo=timezone.utc)

    def _load_state(self) -> datetime | None:
        """Load the last rebalance date from persistent state."""
        import json
        import os

        state_file = ".cc_liquid_state.json"
        if not os.path.exists(state_file):
            return None

        try:
            with open(state_file) as f:
                state = json.load(f)
                last_date = datetime.fromisoformat(state.get("last_rebalance_date"))
                return last_date
        except Exception as e:
            self.logger.warning(f"Could not load state file: {e}")
            return None

    def _save_state(self, last_rebalance_date: datetime):
        """Save the last rebalance date to persistent state."""
        import json

        state_file = ".cc_liquid_state.json"
        with open(state_file, "w") as f:
            json.dump({"last_rebalance_date": last_rebalance_date.isoformat()}, f)

    def _load_predictions(self) -> pl.DataFrame | None:
        """Load predictions based on configured data source."""
        try:
            if self.config.data.source == "local":
                # Use local file
                predictions = DataLoader.from_file(
                    self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            elif self.config.data.source == "crowdcent":
                # Download and use CrowdCent meta model
                predictions = DataLoader.from_crowdcent_api(
                    api_key=self.config.CROWDCENT_API_KEY,
                    download_path=self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            elif self.config.data.source == "numerai":
                # Download and use Numerai meta model
                predictions = DataLoader.from_numerai_api(
                    download_path=self.config.data.path,
                    date_col=self.config.data.date_column,
                    id_col=self.config.data.asset_id_column,
                    pred_col=self.config.data.prediction_column,
                )
            else:
                raise ValueError(f"Unknown data source: {self.config.data.source}")

            return predictions

        except Exception as e:
            self.logger.error(f"Error loading predictions: {e}")
            return None

compute_next_rebalance_time(last_rebalance_date, now=None)

Compute the next scheduled rebalance timestamp in UTC.

Rules: - If this is the first run (no last date): schedule for today at configured time; if already past that time, return "now" to indicate it is due immediately. - Otherwise: schedule exactly every_n_days after the last rebalance date, at the configured time.

Source code in src/cc_liquid/trader.py
def compute_next_rebalance_time(
    self, last_rebalance_date: datetime | None, now: datetime | None = None
) -> datetime:
    """Compute the next scheduled rebalance timestamp in UTC.

    Rules:
    - If this is the first run (no last date): schedule for today at configured time; if
      already past that time, return "now" to indicate it is due immediately.
    - Otherwise: schedule exactly every_n_days after the last rebalance date, at the
      configured time.
    """
    cfg = self.config.portfolio.rebalancing
    now_utc = now or datetime.now(timezone.utc)

    hour, minute = map(int, cfg.at_time.split(":"))
    rebalance_time = time(hour=hour, minute=minute)

    if last_rebalance_date is None:
        today_at = datetime.combine(
            now_utc.date(), rebalance_time, tzinfo=timezone.utc
        )
        return today_at if now_utc < today_at else now_utc

    next_date = last_rebalance_date.date() + timedelta(days=cfg.every_n_days)
    return datetime.combine(next_date, rebalance_time, tzinfo=timezone.utc)

execute_plan(plan)

Execute a precomputed plan, returning structured results.

Source code in src/cc_liquid/trader.py
def execute_plan(self, plan: dict) -> dict:
    """Execute a precomputed plan, returning structured results."""
    trades: list[dict] = plan.get("trades", [])
    if not trades:
        # Nothing to do
        return {"successful_trades": [], "all_trades": trades}

    # Prioritize leverage reduction: execute closes/reductions (and flips) before opens
    trades = self._sort_trades_for_leverage_reduction(trades)

    self.callbacks.info(f"Starting execution of {len(trades)} trades...")
    successful_trades = self._execute_trades(trades)
    return {"successful_trades": successful_trades, "all_trades": trades}

get_account_value()

Retrieves the total account value in USD.

Source code in src/cc_liquid/trader.py
def get_account_value(self) -> float:
    """Retrieves the total account value in USD."""
    user_state = self.get_user_state()
    return float(user_state["marginSummary"]["accountValue"])

get_portfolio_info()

Get complete portfolio information as structured data.

Source code in src/cc_liquid/trader.py
def get_portfolio_info(self) -> PortfolioInfo:
    """Get complete portfolio information as structured data."""
    try:
        user_state = self.get_user_state()
    except Exception as e:
        self.logger.warning(f"Could not get user state: {e}")
        # Return empty portfolio if we can't connect
        return PortfolioInfo(
            account=AccountInfo(
                account_value=0,
                total_position_value=0,
                margin_used=0,
                free_collateral=0,
                cash_balance=0,
                withdrawable=0,
                current_leverage=0,
            ),
            positions=[],
        )

    margin_summary = user_state.get("marginSummary", {}) if user_state else {}
    all_mids = self.info.all_mids() if user_state else {}

    # Build account info
    account_info = AccountInfo(
        account_value=float(margin_summary.get("accountValue", 0)),
        total_position_value=float(margin_summary.get("totalNtlPos", 0)),
        margin_used=float(margin_summary.get("totalMarginUsed", 0)),
        free_collateral=float(margin_summary.get("accountValue", 0))
        - float(margin_summary.get("totalMarginUsed", 0)),
        cash_balance=float(margin_summary.get("totalRawUsd", 0)),
        withdrawable=float(user_state.get("withdrawable", 0)),
        current_leverage=float(margin_summary.get("totalNtlPos", 0))
        / float(margin_summary.get("accountValue", 1))
        if float(margin_summary.get("accountValue", 0)) > 0
        else 0,
        raw_margin_summary=margin_summary,
    )

    # Add cross margin info if available
    cross_margin = user_state.get("crossMarginSummary")
    if cross_margin:
        account_info.cross_leverage = (
            float(cross_margin.get("accountValue", 0))
            / float(margin_summary.get("accountValue", 1))
            if float(margin_summary.get("accountValue", 0)) > 0
            else 0
        )
        account_info.cross_margin_used = float(
            cross_margin.get("totalMarginUsed", 0)
        )
        account_info.cross_maintenance_margin = float(
            cross_margin.get("totalMaintenanceMargin", 0)
        )
        account_info.raw_cross_margin_summary = cross_margin

    # Build positions list
    positions = []
    for position_data in user_state.get("assetPositions", []):
        pos = position_data.get("position", {})
        size = float(pos.get("szi", 0))

        if size == 0:
            continue

        coin = pos["coin"]
        entry_px = float(pos.get("entryPx", 0))
        mark_px = float(all_mids.get(coin, entry_px))
        position_value = abs(size * mark_px)

        # Calculate unrealized PnL
        if size > 0:
            unrealized_pnl = (mark_px - entry_px) * size
            side = "LONG"
        else:
            unrealized_pnl = (entry_px - mark_px) * abs(size)
            side = "SHORT"

        return_pct = (
            (unrealized_pnl / (abs(size) * entry_px) * 100) if entry_px > 0 else 0
        )

        positions.append(
            Position(
                coin=coin,
                side=side,
                size=abs(size),
                entry_price=entry_px,
                mark_price=mark_px,
                value=position_value,
                unrealized_pnl=unrealized_pnl,
                return_pct=return_pct,
                liquidation_price=float(pos["liquidationPx"])
                if "liquidationPx" in pos and pos["liquidationPx"] is not None
                else None,
                margin_used=float(pos["marginUsed"])
                if "marginUsed" in pos and pos["marginUsed"] is not None
                else None,
            )
        )

    return PortfolioInfo(account=account_info, positions=positions)

get_positions()

Retrieves the user's open positions as a dict.

Source code in src/cc_liquid/trader.py
def get_positions(self) -> dict[str, Any]:
    """Retrieves the user's open positions as a dict."""
    user_state = self.get_user_state()
    positions = {}
    for position_data in user_state.get("assetPositions", []):
        position = position_data.get("position", {})
        if float(position.get("szi", 0)) != 0:
            positions[position["coin"]] = position
    return positions

get_user_state()

Retrieves the current state of the user's account.

Source code in src/cc_liquid/trader.py
def get_user_state(self) -> dict[str, Any]:
    """Retrieves the current state of the user's account."""
    # Always query Info using the portfolio owner: vault (if set) or master address.
    # Never use the agent/signer address for Info, as it has no balances.
    owner = self.config.HYPERLIQUID_VAULT_ADDRESS or self.config.HYPERLIQUID_ADDRESS
    if not owner:
        raise ValueError(
            "Missing portfolio owner. Set HYPERLIQUID_VAULT_ADDRESS or HYPERLIQUID_ADDRESS."
        )
    return self.info.user_state(owner)

load_state()

Public wrapper to load last rebalance timestamp.

Source code in src/cc_liquid/trader.py
def load_state(self) -> datetime | None:
    """Public wrapper to load last rebalance timestamp."""
    return self._load_state()

plan_close_all_positions(*, force=False)

Plan to close all open positions (return to cash) without executing orders.

Source code in src/cc_liquid/trader.py
def plan_close_all_positions(self, *, force: bool = False) -> dict:
    """Plan to close all open positions (return to cash) without executing orders."""
    current_positions = self.get_positions()

    if not current_positions:
        self.callbacks.info("No open positions to close.")
        return {
            "target_positions": {},
            "trades": [],
            "skipped_trades": [],
            "account_value": self.get_account_value(),
            "leverage": self.config.portfolio.target_leverage,
        }

    self.callbacks.info("Closing all positions to return to cash...")

    # Create target positions of 0 for all current positions
    target_positions = {coin: 0 for coin in current_positions.keys()}
    trades, skipped_trades = self._calculate_trades(
        target_positions, current_positions, force=force
    )

    account_value = self.get_account_value()
    leverage = self.config.portfolio.target_leverage
    return {
        "target_positions": target_positions,
        "trades": trades,
        "skipped_trades": skipped_trades,
        "account_value": account_value,
        "leverage": leverage,
    }

plan_rebalance(predictions=None)

Compute a rebalancing plan without executing orders.

Source code in src/cc_liquid/trader.py
def plan_rebalance(self, predictions: pl.DataFrame | None = None) -> dict:
    """Compute a rebalancing plan without executing orders."""
    # Load predictions if not provided
    if predictions is None:
        self.callbacks.info("Loading predictions...")
        predictions = self._load_predictions()

        if predictions is None or predictions.is_empty():
            self.callbacks.error("No predictions available, cannot rebalance")
            return {
                "target_positions": {},
                "trades": [],
                "skipped_trades": [],
                "account_value": 0.0,
                "leverage": self.config.portfolio.target_leverage,
            }

        # Display prediction info
        unique_assets = predictions[self.config.data.asset_id_column].n_unique()
        latest_data = predictions[self.config.data.date_column].max()
        self.callbacks.info(
            f"Loaded predictions for {unique_assets} assets (latest: {latest_data})"
        )

    # Asset Selection, Position Calculation, and Trade Generation
    target_positions = self._get_target_positions(predictions)
    current_positions = self.get_positions()
    trades, skipped_trades = self._calculate_trades(
        target_positions, current_positions
    )

    # Build plan (including skipped trades)
    account_value = self.get_account_value()
    leverage = self.config.portfolio.target_leverage
    return {
        "target_positions": target_positions,
        "trades": trades,
        "skipped_trades": skipped_trades,
        "account_value": account_value,
        "leverage": leverage,
    }

save_state(last_rebalance_date)

Public wrapper to persist last rebalance timestamp.

Source code in src/cc_liquid/trader.py
def save_state(self, last_rebalance_date: datetime) -> None:
    """Public wrapper to persist last rebalance timestamp."""
    self._save_state(last_rebalance_date)

Callback protocols

CCLiquidCallbacks

Bases: Protocol

Protocol for trader callbacks to abstract UI/UX concerns.

Source code in src/cc_liquid/callbacks.py
class CCLiquidCallbacks(Protocol):
    """Protocol for trader callbacks to abstract UI/UX concerns."""

    # High-level lifecycle methods
    def ask_confirmation(self, message: str) -> bool:
        """Ask user for confirmation."""
        ...

    def info(self, message: str) -> None:
        """Display info message."""
        ...

    def warn(self, message: str) -> None:
        """Display warning message."""
        ...

    def error(self, message: str) -> None:
        """Display error message."""
        ...

    def on_config_override(self, overrides: list[str]) -> None:
        """Display applied configuration overrides."""
        ...

    # Trade execution progress hooks
    def on_trade_start(self, idx: int, total: int, trade: dict[str, Any]) -> None:
        """Called when a trade execution starts."""
        ...

    def on_trade_fill(
        self, trade: dict[str, Any], fill_data: dict[str, Any], slippage_pct: float
    ) -> None:
        """Called when a trade is filled."""
        ...

    def on_trade_fail(self, trade: dict[str, Any], reason: str) -> None:
        """Called when a trade fails."""
        ...

    def on_batch_complete(self, success: list[dict], failed: list[dict]) -> None:
        """Called when a batch of trades completes."""
        ...

    def show_trade_plan(
        self,
        target_positions: dict,
        trades: list,
        account_value: float,
        leverage: float,
    ) -> None:
        """Display the trade plan before execution."""
        ...

    def show_execution_summary(
        self,
        successful_trades: list[dict],
        all_trades: list[dict],
        target_positions: dict,
        account_value: float,
    ) -> None:
        """Display execution summary after trades complete."""
        ...

ask_confirmation(message)

Ask user for confirmation.

Source code in src/cc_liquid/callbacks.py
def ask_confirmation(self, message: str) -> bool:
    """Ask user for confirmation."""
    ...

error(message)

Display error message.

Source code in src/cc_liquid/callbacks.py
def error(self, message: str) -> None:
    """Display error message."""
    ...

info(message)

Display info message.

Source code in src/cc_liquid/callbacks.py
def info(self, message: str) -> None:
    """Display info message."""
    ...

on_batch_complete(success, failed)

Called when a batch of trades completes.

Source code in src/cc_liquid/callbacks.py
def on_batch_complete(self, success: list[dict], failed: list[dict]) -> None:
    """Called when a batch of trades completes."""
    ...

on_config_override(overrides)

Display applied configuration overrides.

Source code in src/cc_liquid/callbacks.py
def on_config_override(self, overrides: list[str]) -> None:
    """Display applied configuration overrides."""
    ...

on_trade_fail(trade, reason)

Called when a trade fails.

Source code in src/cc_liquid/callbacks.py
def on_trade_fail(self, trade: dict[str, Any], reason: str) -> None:
    """Called when a trade fails."""
    ...

on_trade_fill(trade, fill_data, slippage_pct)

Called when a trade is filled.

Source code in src/cc_liquid/callbacks.py
def on_trade_fill(
    self, trade: dict[str, Any], fill_data: dict[str, Any], slippage_pct: float
) -> None:
    """Called when a trade is filled."""
    ...

on_trade_start(idx, total, trade)

Called when a trade execution starts.

Source code in src/cc_liquid/callbacks.py
def on_trade_start(self, idx: int, total: int, trade: dict[str, Any]) -> None:
    """Called when a trade execution starts."""
    ...

show_execution_summary(successful_trades, all_trades, target_positions, account_value)

Display execution summary after trades complete.

Source code in src/cc_liquid/callbacks.py
def show_execution_summary(
    self,
    successful_trades: list[dict],
    all_trades: list[dict],
    target_positions: dict,
    account_value: float,
) -> None:
    """Display execution summary after trades complete."""
    ...

show_trade_plan(target_positions, trades, account_value, leverage)

Display the trade plan before execution.

Source code in src/cc_liquid/callbacks.py
def show_trade_plan(
    self,
    target_positions: dict,
    trades: list,
    account_value: float,
    leverage: float,
) -> None:
    """Display the trade plan before execution."""
    ...

warn(message)

Display warning message.

Source code in src/cc_liquid/callbacks.py
def warn(self, message: str) -> None:
    """Display warning message."""
    ...