Skip to content

Commit

Permalink
Add framing support for file decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Dec 17, 2024
1 parent 503e501 commit e4a7512
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
2 changes: 2 additions & 0 deletions unison-cli/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ library:
- condition: "!os(windows)"
dependencies: unix
dependencies:
- attoparsec
- Diff
- IntervalMap
- ListLike
Expand All @@ -34,6 +35,7 @@ library:
- concurrent-output
- containers >= 0.6.3
- conduit
- conduit-extra
- cryptonite
- either
- errors
Expand Down
46 changes: 33 additions & 13 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.ST (ST, stToIO)
import Data.Attoparsec.ByteString qualified as A
import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.List qualified as C
import Data.Graph qualified as Graph
import Data.Map qualified as Map
Expand Down Expand Up @@ -158,7 +162,7 @@ streamDownloadEntitiesFromCodeserver unisonShareUrl branchRef hashJwt knownHashe
Cli.Env {authHTTPClient, codebase} <- ask
runExceptT do
let hash = Share.hashJWTHash hashJwt
ExceptT $
ExceptT $ do
(Cli.runTransaction (Q.entityLocation hash)) >>= \case
Just Q.EntityInMainStorage -> pure $ Right ()
-- Just Q.EntityInTempStorage -> error "TODO: implement temp storage handler"
Expand All @@ -171,8 +175,7 @@ streamDownloadEntitiesFromCodeserver unisonShareUrl branchRef hashJwt knownHashe
SyncV2.DownloadEntitiesRequest {branchRef, causalHash = hashJwt, knownHashes}
\header stream -> do
doSync codebase header stream
afterSyncChecks codebase hash
pure ()
mapExceptT liftIO (afterSyncChecks codebase hash)

doSync :: Codebase.Codebase IO v a -> SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> ExceptT PullErr IO ()
doSync codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = _todo} stream = do
Expand All @@ -184,7 +187,7 @@ doSync codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = _to
SyncV2.DependenciesFirst -> syncSortedStream codebase stream
SyncV2.Unsorted -> syncUnsortedStream codebase stream

afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT PullErr Cli ()
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> SyncM ()
afterSyncChecks codebase hash = do
lift (didCausalSuccessfullyImport codebase hash) >>= \case
False -> do
Expand All @@ -193,10 +196,10 @@ afterSyncChecks codebase hash = do
void $ liftIO (Codebase.withConnection codebase Sqlite.vacuum)
where
-- Verify that the expected hash made it into main storage.
didCausalSuccessfullyImport :: Codebase.Codebase IO v a -> Hash32 -> Cli Bool
didCausalSuccessfullyImport :: Codebase.Codebase IO v a -> Hash32 -> IO Bool
didCausalSuccessfullyImport codebase hash = do
let expectedHash = hash32ToCausalHash hash
isJust <$> liftIO (Codebase.runTransaction codebase $ Q.loadCausalByCausalHash expectedHash)
isJust <$> (Codebase.runTransaction codebase $ Q.loadCausalByCausalHash expectedHash)

-- | Topologically sort entities based on their dependencies.
sortDependencyFirst :: [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
Expand All @@ -213,15 +216,32 @@ streamDownloadEntitiesFromFile syncFilePath = do
Cli.Env {codebase} <- ask
runExceptT do
Debug.debugLogM Debug.Temp $ "Kicking off sync"
Timing.time "File Sync" $ liftIO . runExceptT $ do
let stream = C.transPipe C.runResourceT $ C.sourceFile syncFilePath C..| C.transPipe lift entitiesFromBS
mapExceptT liftIO $ Timing.time "File Sync" $ do
let stream = C.transPipe (liftIO . C.runResourceT) (C.sourceFile syncFilePath) C..| unNetString C..| C.mapM decodeFramedEntity
(header, rest) <- initializeStream stream
doSync codebase header rest
afterSyncChecks codebase hash
afterSyncChecks codebase (SyncV2.rootCausalHash header)

-- | Gets the framed chunks from a NetString framed stream.
unNetString :: ConduitT ByteString ByteString SyncM ()
unNetString = do
bs <- C.sinkParser $ do
len <- A8.decimal
_ <- A8.char ':'
bs <- A.take len
_ <- A8.char ','
pure bs
C.yield bs

decodeFramedEntity :: ByteString -> SyncM SyncV2.DownloadEntitiesChunk
decodeFramedEntity bs = do
case CBOR.deserialiseOrFail (BL.fromStrict bs) of
Left err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
Right chunk -> pure chunk

-- Expects a stream of tightly-packed CBOR entities without any framing/separators.
entitiesFromBS :: ConduitT ByteString SyncV2.DownloadEntitiesChunk SyncM ()
entitiesFromBS = C.transPipe (mapExceptT stToIO) $ do
_decodeUnframedEntities :: ConduitT ByteString SyncV2.DownloadEntitiesChunk SyncM ()
_decodeUnframedEntities = C.transPipe (mapExceptT stToIO) $ do
C.await >>= \case
Nothing -> pure ()
Just bs -> do
Expand All @@ -231,7 +251,7 @@ entitiesFromBS = C.transPipe (mapExceptT stToIO) $ do
newDecoder :: ConduitT ByteString SyncV2.DownloadEntitiesChunk (ExceptT PullErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s (SyncV2.DownloadEntitiesChunk)))
newDecoder = do
(lift . lift) CBOR.deserialiseIncremental >>= \case
CBOR.Done _ _ _ -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure "Invalid initial decoder"
CBOR.Done _ _ _ -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorStreamFailure "Invalid initial decoder"
CBOR.Fail _ _ err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
CBOR.Partial k -> pure k
loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s (SyncV2.DownloadEntitiesChunk))) -> ConduitT ByteString SyncV2.DownloadEntitiesChunk (ExceptT PullErr (ST s)) ()
Expand All @@ -247,7 +267,7 @@ entitiesFromBS = C.transPipe (mapExceptT stToIO) $ do
(lift . lift) (k' Nothing) >>= \case
CBOR.Done _ _ a -> C.yield a
CBOR.Fail _ _ err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
CBOR.Partial _ -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure "Unexpected end of input"
CBOR.Partial _ -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorStreamFailure "Unexpected end of input"
Just bs' ->
-- Have some input, keep going.
loop bs' k'
Expand Down
2 changes: 2 additions & 0 deletions unison-cli/unison-cli.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,15 @@ library
, aeson-pretty
, ansi-terminal
, async
, attoparsec
, base
, bytestring
, cmark
, co-log-core
, code-page
, concurrent-output
, conduit
, conduit-extra
, containers >=0.6.3
, cryptonite
, directory
Expand Down
8 changes: 6 additions & 2 deletions unison-share-api/src/Unison/SyncV2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ data StreamInitInfo
= StreamInitInfo
{ version :: Version,
entitySorting :: EntitySorting,
numEntities :: Maybe Word64
numEntities :: Maybe Word64,
rootCausalHash :: Hash32,
rootBranchRef :: Maybe BranchRef
}
deriving (Show, Eq, Ord)

Expand Down Expand Up @@ -194,7 +196,9 @@ instance Serialise StreamInitInfo where
version <- decodeMapKey "v" m
entitySorting <- decodeMapKey "es" m
numEntities <- (optionalDecodeMapKey "ne" m)
pure StreamInitInfo {version, entitySorting, numEntities}
rootCausalHash <- decodeMapKey "rc" m
rootBranchRef <- optionalDecodeMapKey "br" m
pure StreamInitInfo {version, entitySorting, numEntities, rootCausalHash, rootBranchRef}

data EntityChunk = EntityChunk
{ hash :: Hash32,
Expand Down

0 comments on commit e4a7512

Please sign in to comment.