From a17831f2f82245ab58237f6798a01a1dbdeb8989 Mon Sep 17 00:00:00 2001 From: Jake Williams Date: Mon, 16 Feb 2026 07:15:52 -0500 Subject: [PATCH] Add BCP IN/OUT (Bulk Copy Protocol) support for TDS 5.0 Implements bulk copy operations using the native TDS 5.0 BCP wire protocol, enabling high-performance data loading (BCP IN) and extraction (BCP OUT) for SAP ASE databases. Public API follows the SqlBulkCopy pattern: - AseBulkCopy.WriteToServer() for bulk insert from IDataReader/DataTable - AseBulkCopy.ReadFromServer() for bulk extraction to DataTable - Column mapping, batching, and row notification support Protocol details: - Native BCP row format (no TDS_ROW token prefix) - APL and DOL (datarows-locked) offset table formats - REQ_BCP capability negotiation - Supports all fixed and variable-length TDS data types (TEXT/IMAGE/UNITEXT deferred) Verified: 508-row round-trip (BCP OUT -> BCP IN) with full data integrity. All 1389 existing unit tests pass with no regressions. Co-Authored-By: Claude Opus 4.6 --- src/AdoNetCore.AseClient/AseBulkCopy.cs | 293 ++++++++++ .../AseBulkCopyColumnMapping.cs | 72 +++ .../AseBulkCopyColumnMappingCollection.cs | 114 ++++ .../AseBulkCopyRowsCopiedEventArgs.cs | 34 ++ src/AdoNetCore.AseClient/Enum/BufferType.cs | 3 +- .../Interface/IInternalConnection.cs | 13 + .../Internal/BulkCopy/BcpColumnMetadata.cs | 131 +++++ .../Internal/BulkCopy/BcpDataTableWriter.cs | 60 ++ .../Internal/BulkCopy/BcpRowFormatter.cs | 472 ++++++++++++++++ .../Internal/BulkCopy/BcpValueWriter.cs | 467 ++++++++++++++++ .../Internal/InternalConnection.cs | 274 ++++++++- .../Token/ClientCapabilityToken.cs | 2 +- .../BulkCopy/AseBulkCopyColumnMappingTests.cs | 170 ++++++ .../Unit/BulkCopy/BcpRowFormatterTests.cs | 528 ++++++++++++++++++ .../Unit/BulkCopy/BcpValueWriterTests.cs | 274 +++++++++ .../Unit/ConnectionPoolTests.cs | 9 + 16 files changed, 2912 insertions(+), 4 deletions(-) create mode 100644 src/AdoNetCore.AseClient/AseBulkCopy.cs create mode 100644 src/AdoNetCore.AseClient/AseBulkCopyColumnMapping.cs create mode 100644 src/AdoNetCore.AseClient/AseBulkCopyColumnMappingCollection.cs create mode 100644 src/AdoNetCore.AseClient/AseBulkCopyRowsCopiedEventArgs.cs create mode 100644 src/AdoNetCore.AseClient/Internal/BulkCopy/BcpColumnMetadata.cs create mode 100644 src/AdoNetCore.AseClient/Internal/BulkCopy/BcpDataTableWriter.cs create mode 100644 src/AdoNetCore.AseClient/Internal/BulkCopy/BcpRowFormatter.cs create mode 100644 src/AdoNetCore.AseClient/Internal/BulkCopy/BcpValueWriter.cs create mode 100644 test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/AseBulkCopyColumnMappingTests.cs create mode 100644 test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpRowFormatterTests.cs create mode 100644 test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpValueWriterTests.cs diff --git a/src/AdoNetCore.AseClient/AseBulkCopy.cs b/src/AdoNetCore.AseClient/AseBulkCopy.cs new file mode 100644 index 00000000..786ce3db --- /dev/null +++ b/src/AdoNetCore.AseClient/AseBulkCopy.cs @@ -0,0 +1,293 @@ +using System; +using System.Data; +using System.Threading; +using System.Threading.Tasks; +using AdoNetCore.AseClient.Interface; +using AdoNetCore.AseClient.Internal.BulkCopy; + +namespace AdoNetCore.AseClient +{ + /// + /// Provides bulk copy functionality for efficiently loading large volumes of data into ASE tables + /// using the TDS 5.0 BCP protocol. Also supports bulk extraction (BCP OUT) via ReadFromServer. + /// + /// + /// Follows the SqlBulkCopy pattern for API familiarity. TEXT/IMAGE/UNITEXT blob types are not + /// supported in the current implementation. + /// + public sealed class AseBulkCopy : IDisposable + { + private readonly AseConnection _connection; + private readonly AseTransaction _transaction; + private readonly bool _ownsConnection; + private bool _disposed; + + /// + /// Creates a new AseBulkCopy using an existing open connection. + /// + public AseBulkCopy(AseConnection connection) + { + _connection = connection ?? throw new ArgumentNullException(nameof(connection)); + _transaction = null; + _ownsConnection = false; + ColumnMappings = new AseBulkCopyColumnMappingCollection(); + } + + /// + /// Creates a new AseBulkCopy using an existing open connection and transaction. + /// + public AseBulkCopy(AseConnection connection, AseTransaction transaction) + { + _connection = connection ?? throw new ArgumentNullException(nameof(connection)); + _transaction = transaction; + _ownsConnection = false; + ColumnMappings = new AseBulkCopyColumnMappingCollection(); + } + + /// + /// Creates a new AseBulkCopy using a connection string. A new connection will be opened and owned by this instance. + /// + public AseBulkCopy(string connectionString) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentNullException(nameof(connectionString)); + } + + _connection = new AseConnection(connectionString); + _connection.Open(); + _transaction = null; + _ownsConnection = true; + ColumnMappings = new AseBulkCopyColumnMappingCollection(); + } + + /// + /// The name of the destination table on the server. + /// + public string DestinationTableName { get; set; } + + /// + /// Number of rows in each batch. At the end of each batch, the rows are sent to the server. + /// Setting to 0 means all rows are sent in a single batch. + /// + public int BatchSize { get; set; } + + /// + /// Defines the number of rows to be processed before generating a notification event. + /// Setting to 0 disables notifications. + /// + public int NotifyAfter { get; set; } + + /// + /// Column mappings between the data source and the destination table. + /// If empty, columns are mapped by ordinal position. + /// + public AseBulkCopyColumnMappingCollection ColumnMappings { get; } + + /// + /// Occurs every time the number of rows specified by have been processed. + /// + public event AseBulkCopyRowsCopiedEventHandler AseRowsCopied; + + /// + /// Copies all rows from the supplied IDataReader to the destination table. + /// + public void WriteToServer(IDataReader reader) + { + if (reader == null) throw new ArgumentNullException(nameof(reader)); + ThrowIfDisposed(); + ValidateDestination(); + + var internalConnection = _connection.InternalConnection; + internalConnection.ExecuteBulkCopy( + DestinationTableName, + reader, + ColumnMappings, + BatchSize, + NotifyAfter, + OnRowsCopied, + _transaction); + } + +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS + /// + /// Copies all rows from the supplied DataTable to the destination table. + /// + public void WriteToServer(DataTable table) + { + if (table == null) throw new ArgumentNullException(nameof(table)); + ThrowIfDisposed(); + ValidateDestination(); + + using (var reader = table.CreateDataReader()) + { + WriteToServer(reader); + } + } +#endif + + /// + /// Asynchronously copies all rows from the supplied IDataReader to the destination table. + /// + public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellationToken) + { + if (reader == null) throw new ArgumentNullException(nameof(reader)); + ThrowIfDisposed(); + ValidateDestination(); + + return Task.Run(() => WriteToServer(reader), cancellationToken); + } + + /// + /// Asynchronously copies all rows from the supplied IDataReader to the destination table. + /// + public Task WriteToServerAsync(IDataReader reader) + { + return WriteToServerAsync(reader, CancellationToken.None); + } + +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS + /// + /// Asynchronously copies all rows from the supplied DataTable to the destination table. + /// + public Task WriteToServerAsync(DataTable table, CancellationToken cancellationToken) + { + if (table == null) throw new ArgumentNullException(nameof(table)); + ThrowIfDisposed(); + ValidateDestination(); + + return Task.Run(() => WriteToServer(table), cancellationToken); + } +#endif + +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS + /// + /// Bulk extracts data from the server into a DataTable using the specified query. + /// + public DataTable ReadFromServer(string query) + { + if (string.IsNullOrWhiteSpace(query)) throw new ArgumentNullException(nameof(query)); + ThrowIfDisposed(); + + var table = new DataTable(); + + using (var cmd = _connection.CreateCommand()) + { + cmd.CommandText = query; + if (_transaction != null) + { + cmd.Transaction = _transaction; + } + + using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) + { + BcpDataTableWriter.Fill(table, reader, NotifyAfter, OnRowsCopied, this); + } + } + + return table; + } +#endif + + /// + /// Bulk extracts data from the server, invoking the row handler for each row. + /// + public void ReadFromServer(string query, Action rowHandler) + { + if (string.IsNullOrWhiteSpace(query)) throw new ArgumentNullException(nameof(query)); + if (rowHandler == null) throw new ArgumentNullException(nameof(rowHandler)); + ThrowIfDisposed(); + + using (var cmd = _connection.CreateCommand()) + { + cmd.CommandText = query; + if (_transaction != null) + { + cmd.Transaction = _transaction; + } + + using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess)) + { + long rowCount = 0; + while (reader.Read()) + { + rowHandler(reader); + rowCount++; + + if (NotifyAfter > 0 && rowCount % NotifyAfter == 0) + { + if (OnRowsCopied(rowCount)) + { + break; + } + } + } + } + } + } + +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS + /// + /// Asynchronously bulk extracts data from the server into a DataTable. + /// + public Task ReadFromServerAsync(string query, CancellationToken cancellationToken) + { + return Task.Run(() => ReadFromServer(query), cancellationToken); + } +#endif + + /// + /// Asynchronously bulk extracts data from the server, invoking the row handler for each row. + /// + public Task ReadFromServerAsync(string query, Action rowHandler, CancellationToken cancellationToken) + { + return Task.Run(() => ReadFromServer(query, rowHandler), cancellationToken); + } + + /// + /// Fires the AseRowsCopied event. Returns true if the operation should be aborted. + /// + internal bool OnRowsCopied(long rowsCopied) + { + var handler = AseRowsCopied; + if (handler != null) + { + var args = new AseBulkCopyRowsCopiedEventArgs(rowsCopied); + handler(this, args); + return args.Abort; + } + return false; + } + + private void ValidateDestination() + { + if (string.IsNullOrWhiteSpace(DestinationTableName)) + { + throw new InvalidOperationException("DestinationTableName must be set before calling WriteToServer."); + } + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(AseBulkCopy)); + } + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _disposed = true; + + if (_ownsConnection) + { + _connection.Dispose(); + } + } + } +} diff --git a/src/AdoNetCore.AseClient/AseBulkCopyColumnMapping.cs b/src/AdoNetCore.AseClient/AseBulkCopyColumnMapping.cs new file mode 100644 index 00000000..ff5fbd63 --- /dev/null +++ b/src/AdoNetCore.AseClient/AseBulkCopyColumnMapping.cs @@ -0,0 +1,72 @@ +namespace AdoNetCore.AseClient +{ + /// + /// Defines the mapping between a column in an data source and a column in the destination table. + /// + public sealed class AseBulkCopyColumnMapping + { + /// + /// Creates a mapping using source and destination column ordinals. + /// + public AseBulkCopyColumnMapping(int sourceColumnOrdinal, int destinationOrdinal) + { + SourceOrdinal = sourceColumnOrdinal; + DestinationOrdinal = destinationOrdinal; + SourceColumn = null; + DestinationColumn = null; + } + + /// + /// Creates a mapping using source and destination column names. + /// + public AseBulkCopyColumnMapping(string sourceColumn, string destinationColumn) + { + SourceColumn = sourceColumn; + DestinationColumn = destinationColumn; + SourceOrdinal = -1; + DestinationOrdinal = -1; + } + + /// + /// Creates a mapping using a source column name and destination ordinal. + /// + public AseBulkCopyColumnMapping(string sourceColumn, int destinationOrdinal) + { + SourceColumn = sourceColumn; + DestinationOrdinal = destinationOrdinal; + SourceOrdinal = -1; + DestinationColumn = null; + } + + /// + /// Creates a mapping using a source ordinal and destination column name. + /// + public AseBulkCopyColumnMapping(int sourceColumnOrdinal, string destinationColumn) + { + SourceOrdinal = sourceColumnOrdinal; + DestinationColumn = destinationColumn; + SourceColumn = null; + DestinationOrdinal = -1; + } + + /// + /// The name of the column in the data source. + /// + public string SourceColumn { get; set; } + + /// + /// The ordinal of the column in the data source. + /// + public int SourceOrdinal { get; set; } + + /// + /// The name of the column in the destination table. + /// + public string DestinationColumn { get; set; } + + /// + /// The ordinal of the column in the destination table. + /// + public int DestinationOrdinal { get; set; } + } +} diff --git a/src/AdoNetCore.AseClient/AseBulkCopyColumnMappingCollection.cs b/src/AdoNetCore.AseClient/AseBulkCopyColumnMappingCollection.cs new file mode 100644 index 00000000..c3472f19 --- /dev/null +++ b/src/AdoNetCore.AseClient/AseBulkCopyColumnMappingCollection.cs @@ -0,0 +1,114 @@ +using System.Collections; +using System.Collections.Generic; + +namespace AdoNetCore.AseClient +{ + /// + /// A collection of objects that define column mappings + /// between a data source and the destination table. + /// + public sealed class AseBulkCopyColumnMappingCollection : IList + { + private readonly List _mappings = new List(); + + /// + /// Adds a mapping using source and destination column names. + /// + public AseBulkCopyColumnMapping Add(string sourceColumn, string destinationColumn) + { + var mapping = new AseBulkCopyColumnMapping(sourceColumn, destinationColumn); + _mappings.Add(mapping); + return mapping; + } + + /// + /// Adds a mapping using source and destination column ordinals. + /// + public AseBulkCopyColumnMapping Add(int sourceColumnOrdinal, int destinationOrdinal) + { + var mapping = new AseBulkCopyColumnMapping(sourceColumnOrdinal, destinationOrdinal); + _mappings.Add(mapping); + return mapping; + } + + /// + /// Adds a mapping using a source column name and destination ordinal. + /// + public AseBulkCopyColumnMapping Add(string sourceColumn, int destinationOrdinal) + { + var mapping = new AseBulkCopyColumnMapping(sourceColumn, destinationOrdinal); + _mappings.Add(mapping); + return mapping; + } + + /// + /// Adds a mapping using a source ordinal and destination column name. + /// + public AseBulkCopyColumnMapping Add(int sourceColumnOrdinal, string destinationColumn) + { + var mapping = new AseBulkCopyColumnMapping(sourceColumnOrdinal, destinationColumn); + _mappings.Add(mapping); + return mapping; + } + + public void Add(AseBulkCopyColumnMapping item) + { + _mappings.Add(item); + } + + public void Clear() + { + _mappings.Clear(); + } + + public bool Contains(AseBulkCopyColumnMapping item) + { + return _mappings.Contains(item); + } + + public void CopyTo(AseBulkCopyColumnMapping[] array, int arrayIndex) + { + _mappings.CopyTo(array, arrayIndex); + } + + public bool Remove(AseBulkCopyColumnMapping item) + { + return _mappings.Remove(item); + } + + public int Count => _mappings.Count; + + public bool IsReadOnly => false; + + public int IndexOf(AseBulkCopyColumnMapping item) + { + return _mappings.IndexOf(item); + } + + public void Insert(int index, AseBulkCopyColumnMapping item) + { + _mappings.Insert(index, item); + } + + public void RemoveAt(int index) + { + _mappings.RemoveAt(index); + } + + public AseBulkCopyColumnMapping this[int index] + { + get => _mappings[index]; + set => _mappings[index] = value; + } + + public IEnumerator GetEnumerator() + { + return _mappings.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } +} diff --git a/src/AdoNetCore.AseClient/AseBulkCopyRowsCopiedEventArgs.cs b/src/AdoNetCore.AseClient/AseBulkCopyRowsCopiedEventArgs.cs new file mode 100644 index 00000000..57f42e31 --- /dev/null +++ b/src/AdoNetCore.AseClient/AseBulkCopyRowsCopiedEventArgs.cs @@ -0,0 +1,34 @@ +using System; + +namespace AdoNetCore.AseClient +{ + /// + /// Provides data for the event. + /// + public class AseBulkCopyRowsCopiedEventArgs : EventArgs + { + /// + /// Creates an instance of . + /// + /// The number of rows copied so far. + public AseBulkCopyRowsCopiedEventArgs(long rowsCopied) + { + RowsCopied = rowsCopied; + } + + /// + /// The number of rows copied during the current bulk copy operation. + /// + public long RowsCopied { get; } + + /// + /// Set to true to abort the bulk copy operation. + /// + public bool Abort { get; set; } + } + + /// + /// Delegate for the event. + /// + public delegate void AseBulkCopyRowsCopiedEventHandler(object sender, AseBulkCopyRowsCopiedEventArgs e); +} diff --git a/src/AdoNetCore.AseClient/Enum/BufferType.cs b/src/AdoNetCore.AseClient/Enum/BufferType.cs index e37a71d6..438a83af 100644 --- a/src/AdoNetCore.AseClient/Enum/BufferType.cs +++ b/src/AdoNetCore.AseClient/Enum/BufferType.cs @@ -45,9 +45,8 @@ internal enum BufferType : byte /// TDS_BUF_ATTN = 6, /// - /// The buffer contains bulk binary data. + /// The buffer contains bulk binary data. Used for BCP (Bulk Copy Protocol) operations. /// - [Obsolete] TDS_BUF_BULK = 7, /// /// A protocol request to setup another logical channel. This buffer is a header only and does not contain any data. diff --git a/src/AdoNetCore.AseClient/Interface/IInternalConnection.cs b/src/AdoNetCore.AseClient/Interface/IInternalConnection.cs index 3078f129..bf222460 100644 --- a/src/AdoNetCore.AseClient/Interface/IInternalConnection.cs +++ b/src/AdoNetCore.AseClient/Interface/IInternalConnection.cs @@ -3,6 +3,7 @@ using System.Data; using System.Data.Common; using System.Threading.Tasks; +using AdoNetCore.AseClient.Internal.BulkCopy; namespace AdoNetCore.AseClient.Interface { @@ -128,5 +129,17 @@ bool NamedParameters /// The 'caller event' notifier for the connection /// IInfoMessageEventNotifier EventNotifier { get; set; } + + /// + /// Execute a bulk copy operation, sending rows from the reader to the specified destination table. + /// + void ExecuteBulkCopy( + string destinationTableName, + IDataReader reader, + AseBulkCopyColumnMappingCollection columnMappings, + int batchSize, + int notifyAfter, + Func onNotify, + AseTransaction transaction); } } diff --git a/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpColumnMetadata.cs b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpColumnMetadata.cs new file mode 100644 index 00000000..0370ee81 --- /dev/null +++ b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpColumnMetadata.cs @@ -0,0 +1,131 @@ +using AdoNetCore.AseClient.Enum; + +namespace AdoNetCore.AseClient.Internal.BulkCopy +{ + /// + /// Column metadata parsed from the server's response to an INSERT BULK command. + /// Used to determine how to format row data in native BCP format. + /// + internal class BcpColumnMetadata + { + /// + /// Column ordinal (0-based position in the row). + /// + public int Ordinal { get; set; } + + /// + /// Column name from the server. + /// + public string Name { get; set; } + + /// + /// TDS data type for the column. + /// + public TdsDataType DataType { get; set; } + + /// + /// Maximum length for variable-length types, or the fixed size for fixed-length types. + /// + public int Length { get; set; } + + /// + /// Precision for DECN/NUMN types. + /// + public byte Precision { get; set; } + + /// + /// Scale for DECN/NUMN types. + /// + public byte Scale { get; set; } + + /// + /// User-defined type from server metadata. + /// + public int UserType { get; set; } + + /// + /// Row format status flags (nullable, identity, etc.). + /// + public RowFormatItemStatus RowStatus { get; set; } + + /// + /// Whether this column allows NULL values. + /// + public bool IsNullable => (RowStatus & RowFormatItemStatus.TDS_ROW_NULLALLOWED) != 0; + + /// + /// Whether this column is an identity column. + /// + public bool IsIdentity => (RowStatus & RowFormatItemStatus.TDS_ROW_IDENTITY) != 0; + + /// + /// Whether this is a fixed-length data type in native BCP format. + /// Fixed-length types are packed without length prefixes. + /// + /// + /// Whether this column occupies a fixed offset in the native BCP row format. + /// Determined by the server's INSERT BULK metadata: positive offset = fixed, negative = variable. + /// + public bool IsFixedLength { get; set; } + + /// + /// Gets the native byte size for fixed-length data types. + /// For types like TDS_CHAR, TDS_NUMN, TDS_DECN, the size comes from the Length property. + /// + public int FixedByteSize + { + get + { + switch (DataType) + { + case TdsDataType.TDS_BIT: + case TdsDataType.TDS_INT1: + return 1; + case TdsDataType.TDS_INT2: + case TdsDataType.TDS_UINT2: + return 2; + case TdsDataType.TDS_INT4: + case TdsDataType.TDS_UINT4: + case TdsDataType.TDS_FLT4: + case TdsDataType.TDS_SHORTDATE: + case TdsDataType.TDS_DATE: + case TdsDataType.TDS_TIME: + case TdsDataType.TDS_SHORTMONEY: + return 4; + case TdsDataType.TDS_INT8: + case TdsDataType.TDS_UINT8: + case TdsDataType.TDS_FLT8: + case TdsDataType.TDS_DATETIME: + case TdsDataType.TDS_MONEY: + return 8; + // Types whose fixed size depends on column definition + case TdsDataType.TDS_CHAR: + case TdsDataType.TDS_BINARY: + case TdsDataType.TDS_NUMN: + case TdsDataType.TDS_DECN: + return Length; + default: + return Length > 0 ? Length : 0; + } + } + } + + /// + /// Creates BcpColumnMetadata from a FormatItem received from the server. + /// + public static BcpColumnMetadata FromFormatItem(FormatItem format, int ordinal) + { + return new BcpColumnMetadata + { + Ordinal = ordinal, + Name = format.DisplayColumnName, + DataType = format.DataType, + Length = format.Length ?? 0, + Precision = format.Precision ?? 0, + Scale = format.Scale ?? 0, + UserType = format.UserType, + RowStatus = format.RowStatus + }; + } + } +} diff --git a/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpDataTableWriter.cs b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpDataTableWriter.cs new file mode 100644 index 00000000..d09a41cf --- /dev/null +++ b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpDataTableWriter.cs @@ -0,0 +1,60 @@ +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS +using System; +using System.Data; + +namespace AdoNetCore.AseClient.Internal.BulkCopy +{ + /// + /// Efficiently populates a DataTable from a streaming IDataReader, + /// used for BCP OUT operations. + /// + internal static class BcpDataTableWriter + { + /// + /// Fills a DataTable from the reader, building schema from reader metadata. + /// + public static void Fill(DataTable table, IDataReader reader, int notifyAfter, Func onNotify, object sender) + { + // Build schema from reader + var fieldCount = reader.FieldCount; + var columns = new DataColumn[fieldCount]; + for (var i = 0; i < fieldCount; i++) + { + var colName = reader.GetName(i); + var colType = reader.GetFieldType(i); + var col = new DataColumn(colName, colType ?? typeof(object)); + col.AllowDBNull = true; + table.Columns.Add(col); + columns[i] = col; + } + + // Read rows + var values = new object[fieldCount]; + long rowCount = 0; + + table.BeginLoadData(); + try + { + while (reader.Read()) + { + reader.GetValues(values); + table.Rows.Add(values); + rowCount++; + + if (notifyAfter > 0 && rowCount % notifyAfter == 0) + { + if (onNotify != null && onNotify(rowCount)) + { + break; // Abort requested + } + } + } + } + finally + { + table.EndLoadData(); + } + } + } +} +#endif diff --git a/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpRowFormatter.cs b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpRowFormatter.cs new file mode 100644 index 00000000..72e9e2b3 --- /dev/null +++ b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpRowFormatter.cs @@ -0,0 +1,472 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.IO; +using System.Text; +using AdoNetCore.AseClient.Enum; + +namespace AdoNetCore.AseClient.Internal.BulkCopy +{ + /// + /// Constructs full native BCP row buffers for the TDS 5.0 bulk copy protocol. + /// + /// Native row format (from FreeTDS tds5_send_record): + /// [4-byte DOL padding (if datarows-locked)] + /// [1 byte: variable column count (actual non-trailing-null count)] + /// [1 byte: row number (always 0)] + /// [fixed column data -- packed sequentially, no length prefixes] + /// [2 bytes: total row size LE (only if variable columns present)] + /// [variable column data -- packed sequentially] + /// [offset table] + /// + /// The 2-byte row size field and offset table are only present when there + /// are non-trailing-null variable columns. + /// + /// On the wire, each row is preceded by a 2-byte LE length prefix (the total + /// record size), sent by the caller. + /// + internal class BcpRowFormatter + { + private readonly BcpColumnMetadata[] _columns; + private readonly bool _isDatarowsLocked; + private readonly Encoding _encoding; + private readonly int[] _sourceOrdinals; + private readonly List _fixedColumnIndices; + private readonly List _variableColumnIndices; + + /// + /// Creates a new BcpRowFormatter. + /// + /// Column metadata from the server's INSERT BULK response. + /// Whether the table uses datarows locking (requires 4-byte padding). + /// Character encoding for string data. + /// + /// Maps server column index to source (IDataReader) ordinal. + /// sourceOrdinals[i] is the reader ordinal for server column i, or -1 to skip. + /// + public BcpRowFormatter(BcpColumnMetadata[] columns, bool isDatarowsLocked, Encoding encoding, int[] sourceOrdinals) + { + _columns = columns; + _isDatarowsLocked = isDatarowsLocked; + _encoding = encoding; + _sourceOrdinals = sourceOrdinals; + _fixedColumnIndices = new List(); + _variableColumnIndices = new List(); + + for (var i = 0; i < columns.Length; i++) + { + if (columns[i].IsFixedLength) + { + _fixedColumnIndices.Add(i); + } + else + { + _variableColumnIndices.Add(i); + } + } + } + + /// + /// Formats a single row from an IDataReader into native BCP wire format. + /// + /// The data reader positioned at the current row. + /// The complete row buffer ready to write to the TDS stream. + public byte[] FormatRow(IDataReader reader) + { + using (var ms = new MemoryStream()) + { + // DOL padding for datarows-locked tables + if (_isDatarowsLocked) + { + ms.WriteByte(0); + ms.WriteByte(0); + ms.WriteByte(0); + ms.WriteByte(0); + } + + // Save position for varcount header (will be updated later) + var varColsPos = (int)ms.Position; + ms.WriteByte(0); // varcount placeholder + ms.WriteByte(0); // row number (always 0) + + // Write fixed columns sequentially (no length prefix) + for (var i = 0; i < _fixedColumnIndices.Count; i++) + { + var colIdx = _fixedColumnIndices[i]; + var srcOrd = _sourceOrdinals[colIdx]; + var value = srcOrd >= 0 ? reader.GetValue(srcOrd) : DBNull.Value; + BcpValueWriter.WriteFixedValue(ms, value, _columns[colIdx], _encoding); + } + + // Variable columns section + var rowSzPos = (int)ms.Position; + var varCount = _variableColumnIndices.Count; + + if (varCount > 0) + { + // Reserve 2 bytes for total row size placeholder + ms.WriteByte(0); + ms.WriteByte(0); + + // offsets[0] = absolute position of start of variable data + var offsets = new int[varCount + 1]; + offsets[0] = (int)ms.Position; + var ncols = 0; + + for (var i = 0; i < varCount; i++) + { + var colIdx = _variableColumnIndices[i]; + var srcOrd = _sourceOrdinals[colIdx]; + var value = srcOrd >= 0 ? reader.GetValue(srcOrd) : DBNull.Value; + var isNull = value == null || value == DBNull.Value; + + if (!isNull) + { + var written = BcpValueWriter.WriteVariableValue(ms, value, _columns[colIdx], _encoding); + // Empty varchar/char → write single space byte (FreeTDS convention) + if (written == 0 && IsStringType(_columns[colIdx].DataType)) + { + ms.WriteByte(0x20); + } + } + + offsets[++ncols] = (int)ms.Position; + } + + // Strip trailing null columns (where consecutive offsets are equal) + while (ncols > 0 && offsets[ncols] == offsets[ncols - 1]) + { + ncols--; + } + + if (ncols > 0) + { + // Write offset table + if (_isDatarowsLocked) + { + WriteDolOffsetTable(ms, offsets, ncols); + } + else + { + WriteAplOffsetTable(ms, offsets, ncols); + } + + // Fill in total row size at rowSzPos (2-byte LE) + var totalRowSize = (int)ms.Position; + ms.Position = rowSzPos; + ms.WriteByte((byte)(totalRowSize & 0xFF)); + ms.WriteByte((byte)((totalRowSize >> 8) & 0xFF)); + + // Fill in actual var cols written at varColsPos + ms.Position = varColsPos; + ms.WriteByte((byte)ncols); + + // Seek back to end + ms.Position = totalRowSize; + } + else + { + // All variable columns are trailing nulls - remove 2-byte placeholder + ms.SetLength(rowSzPos); + ms.Position = rowSzPos; + } + } + + return ms.ToArray(); + } + } + + /// + /// Formats a single row from an array of values into native BCP wire format. + /// + /// Column values indexed by server column ordinal. + /// The complete row buffer ready to write to the TDS stream. + public byte[] FormatRow(object[] values) + { + using (var ms = new MemoryStream()) + { + // DOL padding for datarows-locked tables + if (_isDatarowsLocked) + { + ms.WriteByte(0); + ms.WriteByte(0); + ms.WriteByte(0); + ms.WriteByte(0); + } + + // Save position for varcount header + var varColsPos = (int)ms.Position; + ms.WriteByte(0); // varcount placeholder + ms.WriteByte(0); // row number (always 0) + + // Write fixed columns sequentially + for (var i = 0; i < _fixedColumnIndices.Count; i++) + { + var colIdx = _fixedColumnIndices[i]; + var srcOrd = _sourceOrdinals[colIdx]; + var value = srcOrd >= 0 && srcOrd < values.Length ? values[srcOrd] : DBNull.Value; + BcpValueWriter.WriteFixedValue(ms, value, _columns[colIdx], _encoding); + } + + // Variable columns section + var rowSzPos = (int)ms.Position; + var varCount = _variableColumnIndices.Count; + + if (varCount > 0) + { + // Reserve 2 bytes for total row size placeholder + ms.WriteByte(0); + ms.WriteByte(0); + + var offsets = new int[varCount + 1]; + offsets[0] = (int)ms.Position; + var ncols = 0; + + for (var i = 0; i < varCount; i++) + { + var colIdx = _variableColumnIndices[i]; + var srcOrd = _sourceOrdinals[colIdx]; + var value = srcOrd >= 0 && srcOrd < values.Length ? values[srcOrd] : DBNull.Value; + var isNull = value == null || value == DBNull.Value; + + if (!isNull) + { + var written = BcpValueWriter.WriteVariableValue(ms, value, _columns[colIdx], _encoding); + if (written == 0 && IsStringType(_columns[colIdx].DataType)) + { + ms.WriteByte(0x20); + } + } + + offsets[++ncols] = (int)ms.Position; + } + + while (ncols > 0 && offsets[ncols] == offsets[ncols - 1]) + { + ncols--; + } + + if (ncols > 0) + { + if (_isDatarowsLocked) + { + WriteDolOffsetTable(ms, offsets, ncols); + } + else + { + WriteAplOffsetTable(ms, offsets, ncols); + } + + var totalRowSize = (int)ms.Position; + ms.Position = rowSzPos; + ms.WriteByte((byte)(totalRowSize & 0xFF)); + ms.WriteByte((byte)((totalRowSize >> 8) & 0xFF)); + + ms.Position = varColsPos; + ms.WriteByte((byte)ncols); + + ms.Position = totalRowSize; + } + else + { + ms.SetLength(rowSzPos); + ms.Position = rowSzPos; + } + } + + return ms.ToArray(); + } + } + + private static bool IsStringType(TdsDataType type) + { + return type == TdsDataType.TDS_CHAR + || type == TdsDataType.TDS_VARCHAR + || type == TdsDataType.TDS_LONGCHAR; + } + + /// + /// Writes APL (allpages-locked) offset table. + /// From FreeTDS: adjustment bytes (if offsets > 255), then count byte, then + /// low bytes of offsets in reverse order (offsets[ncols] down to offsets[0]). + /// Total of ncols+1 offset entries. + /// + private static void WriteAplOffsetTable(Stream ms, int[] offsets, int ncols) + { + var pfxTop = offsets[ncols] >> 8; + + if (pfxTop == 0) + { + // Simple case: all offsets < 256 + ms.WriteByte((byte)(ncols + 1)); + } + else + { + // Need adjustment table for high bytes + if (offsets[ncols] / 256 == offsets[ncols - 1] / 256) + { + ms.WriteByte((byte)(ncols + 1)); + } + + // Run-length prefix encoding for high bytes + var topCopy = pfxTop; + while (topCopy > 0) + { + var nPfx = 1; + for (var i = 0; i <= ncols; i++) + { + if ((offsets[i] >> 8) < topCopy) + { + nPfx++; + } + } + ms.WriteByte((byte)nPfx); + topCopy--; + } + } + + // Write low bytes of all offsets in reverse order + for (var i = 0; i <= ncols; i++) + { + ms.WriteByte((byte)(offsets[ncols - i] & 0xFF)); + } + } + + /// + /// Writes DOL (datarows-locked) offset table. + /// Each non-fixed column gets a 2-byte absolute offset in reverse order. + /// Does not include the "one past the end" offset that APL format has. + /// + private static void WriteDolOffsetTable(Stream ms, int[] offsets, int ncols) + { + for (var col = ncols - 1; col >= 0; col--) + { + ms.WriteByte((byte)(offsets[col] & 0xFF)); + ms.WriteByte((byte)((offsets[col] >> 8) & 0xFF)); + } + } + + /// + /// Resolves column mappings to produce source ordinals. + /// Returns an array where result[i] is the source reader ordinal for server column i. + /// + public static int[] ResolveColumnMappings( + BcpColumnMetadata[] serverColumns, + IDataReader reader, + AseBulkCopyColumnMappingCollection mappings) + { + var sourceOrdinals = new int[serverColumns.Length]; + + if (mappings == null || mappings.Count == 0) + { + var readerFieldCount = reader.FieldCount; + + // Count non-identity server columns + var nonIdentityCount = 0; + for (var i = 0; i < serverColumns.Length; i++) + { + if (!serverColumns[i].IsIdentity) + { + nonIdentityCount++; + } + } + + if (readerFieldCount == serverColumns.Length) + { + // Reader has same number of fields as server columns (including identity). + // Map 1:1 by ordinal — the reader is providing identity values too + // (e.g., BCP OUT -> BCP IN pipeline for the same table). + for (var i = 0; i < serverColumns.Length; i++) + { + sourceOrdinals[i] = i; + } + } + else + { + // Reader has fewer fields — skip identity columns and map sequentially + var readerOrdinal = 0; + for (var i = 0; i < serverColumns.Length; i++) + { + if (serverColumns[i].IsIdentity) + { + sourceOrdinals[i] = -1; + } + else if (readerOrdinal < readerFieldCount) + { + sourceOrdinals[i] = readerOrdinal; + readerOrdinal++; + } + else + { + sourceOrdinals[i] = -1; + } + } + } + } + else + { + // Initialize all to -1 (unmapped) + for (var i = 0; i < sourceOrdinals.Length; i++) + { + sourceOrdinals[i] = -1; + } + + // Apply explicit mappings + foreach (var mapping in mappings) + { + var destIdx = ResolveDestinationOrdinal(serverColumns, mapping); + var srcIdx = ResolveSourceOrdinal(reader, mapping); + + if (destIdx >= 0 && destIdx < serverColumns.Length && srcIdx >= 0) + { + sourceOrdinals[destIdx] = srcIdx; + } + } + } + + return sourceOrdinals; + } + + private static int ResolveDestinationOrdinal(BcpColumnMetadata[] serverColumns, AseBulkCopyColumnMapping mapping) + { + if (mapping.DestinationOrdinal >= 0) + { + return mapping.DestinationOrdinal; + } + + if (!string.IsNullOrEmpty(mapping.DestinationColumn)) + { + for (var i = 0; i < serverColumns.Length; i++) + { + if (string.Equals(serverColumns[i].Name, mapping.DestinationColumn, StringComparison.OrdinalIgnoreCase)) + { + return i; + } + } + } + + return -1; + } + + private static int ResolveSourceOrdinal(IDataReader reader, AseBulkCopyColumnMapping mapping) + { + if (mapping.SourceOrdinal >= 0) + { + return mapping.SourceOrdinal; + } + + if (!string.IsNullOrEmpty(mapping.SourceColumn)) + { + for (var i = 0; i < reader.FieldCount; i++) + { + if (string.Equals(reader.GetName(i), mapping.SourceColumn, StringComparison.OrdinalIgnoreCase)) + { + return i; + } + } + } + + return -1; + } + } +} diff --git a/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpValueWriter.cs b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpValueWriter.cs new file mode 100644 index 00000000..de2ab643 --- /dev/null +++ b/src/AdoNetCore.AseClient/Internal/BulkCopy/BcpValueWriter.cs @@ -0,0 +1,467 @@ +using System; +using System.IO; +using System.Text; +using AdoNetCore.AseClient.Enum; + +namespace AdoNetCore.AseClient.Internal.BulkCopy +{ + /// + /// Writes values in native BCP format for bulk copy operations. + /// Unlike the regular ValueWriter which uses TDS token format (with length prefixes for nullable types), + /// BCP native format writes fixed-length types as raw bytes with no prefix, and variable-length types + /// as raw data whose length is tracked via the offset table. + /// + internal static class BcpValueWriter + { + /// + /// Writes a fixed-length value in native BCP format (no length prefix). + /// For null values on fixed-length types, writes zero-filled bytes. + /// + public static void WriteFixedValue(Stream stream, object value, BcpColumnMetadata column, Encoding encoding) + { + if (value == null || value == DBNull.Value) + { + if (column.DataType == TdsDataType.TDS_CHAR) + { + // Pad with spaces for char columns + for (var p = 0; p < column.FixedByteSize; p++) + { + stream.WriteByte(0x20); + } + } + else + { + WriteZeroFill(stream, column.FixedByteSize); + } + return; + } + + switch (column.DataType) + { + case TdsDataType.TDS_BIT: + stream.WriteByte(Convert.ToBoolean(value) ? (byte)1 : (byte)0); + break; + case TdsDataType.TDS_INT1: + stream.WriteByte(Convert.ToByte(value)); + break; + case TdsDataType.TDS_INT2: + stream.WriteShort(Convert.ToInt16(value)); + break; + case TdsDataType.TDS_UINT2: + stream.WriteUShort(Convert.ToUInt16(value)); + break; + case TdsDataType.TDS_INT4: + stream.WriteInt(Convert.ToInt32(value)); + break; + case TdsDataType.TDS_UINT4: + stream.WriteUInt(Convert.ToUInt32(value)); + break; + case TdsDataType.TDS_INT8: + stream.WriteLong(Convert.ToInt64(value)); + break; + case TdsDataType.TDS_UINT8: + stream.WriteULong(Convert.ToUInt64(value)); + break; + case TdsDataType.TDS_FLT4: + stream.WriteFloat(Convert.ToSingle(value)); + break; + case TdsDataType.TDS_FLT8: + stream.WriteDouble(Convert.ToDouble(value)); + break; + case TdsDataType.TDS_DATETIME: + stream.WriteIntPartDateTime(Convert.ToDateTime(value)); + break; + case TdsDataType.TDS_SHORTDATE: + WriteShortDate(stream, Convert.ToDateTime(value)); + break; + case TdsDataType.TDS_DATE: + stream.WriteDate(Convert.ToDateTime(value)); + break; + case TdsDataType.TDS_TIME: + WriteTimeValue(stream, value); + break; + case TdsDataType.TDS_MONEY: + WriteMoneyRaw(stream, Convert.ToDecimal(value)); + break; + case TdsDataType.TDS_SHORTMONEY: + WriteShortMoney(stream, Convert.ToDecimal(value)); + break; + case TdsDataType.TDS_CHAR: + WriteFixedChar(stream, value, column.FixedByteSize, encoding); + break; + case TdsDataType.TDS_BINARY: + WriteFixedBinary(stream, value, column.FixedByteSize); + break; + case TdsDataType.TDS_DECN: + case TdsDataType.TDS_NUMN: + WriteFixedDecimal(stream, value, column.FixedByteSize); + break; + default: + throw new NotSupportedException($"Unsupported fixed-length BCP type: {column.DataType}"); + } + } + + /// + /// Writes a variable-length value in native BCP format (raw bytes, no length prefix). + /// The caller is responsible for tracking the offset via the offset table. + /// Returns the number of bytes written. + /// + public static int WriteVariableValue(Stream stream, object value, BcpColumnMetadata column, Encoding encoding) + { + if (value == null || value == DBNull.Value) + { + return 0; + } + + switch (column.DataType) + { + case TdsDataType.TDS_INTN: + return WriteIntN(stream, value, column.Length); + case TdsDataType.TDS_UINTN: + return WriteUIntN(stream, value, column.Length); + case TdsDataType.TDS_FLTN: + return WriteFloatN(stream, value, column.Length); + case TdsDataType.TDS_DATETIMEN: + return WriteDateTimeN(stream, value, column.Length); + case TdsDataType.TDS_BIGDATETIMEN: + return WriteBigDateTimeN(stream, value); + case TdsDataType.TDS_DATEN: + return WriteDateN(stream, value); + case TdsDataType.TDS_TIMEN: + return WriteTimeN(stream, value); + case TdsDataType.TDS_MONEYN: + return WriteMoneyN(stream, value, column.Length); + case TdsDataType.TDS_CHAR: + case TdsDataType.TDS_VARCHAR: + return WriteString(stream, value, column.Length, encoding); + case TdsDataType.TDS_LONGCHAR: + return WriteLongString(stream, value, column.Length, encoding); + case TdsDataType.TDS_BINARY: + case TdsDataType.TDS_VARBINARY: + return WriteBinary(stream, value, column.Length); + case TdsDataType.TDS_LONGBINARY: + return WriteLongBinary(stream, value, column); + case TdsDataType.TDS_DECN: + case TdsDataType.TDS_NUMN: + return WriteDecimal(stream, value); + default: + throw new NotSupportedException($"Unsupported variable-length BCP type: {column.DataType}"); + } + } + + private static void WriteZeroFill(Stream stream, int count) + { + for (var i = 0; i < count; i++) + { + stream.WriteByte(0); + } + } + + private static void WriteFixedChar(Stream stream, object value, int fixedSize, Encoding encoding) + { + var str = Convert.ToString(value) ?? string.Empty; + var bytes = encoding.GetBytes(str); + if (bytes.Length >= fixedSize) + { + stream.Write(bytes, 0, fixedSize); + } + else + { + stream.Write(bytes, 0, bytes.Length); + // Pad with spaces + for (var p = bytes.Length; p < fixedSize; p++) + { + stream.WriteByte(0x20); + } + } + } + + private static void WriteFixedBinary(Stream stream, object value, int fixedSize) + { + var bytes = value as byte[]; + if (bytes != null) + { + if (bytes.Length >= fixedSize) + { + stream.Write(bytes, 0, fixedSize); + } + else + { + stream.Write(bytes, 0, bytes.Length); + WriteZeroFill(stream, fixedSize - bytes.Length); + } + } + else + { + WriteZeroFill(stream, fixedSize); + } + } + + private static void WriteFixedDecimal(Stream stream, object value, int fixedSize) + { + if (value is AseDecimal ad) + { + stream.WriteByte(ad.IsPositive ? (byte)0 : (byte)1); + var data = ad.BinData; + var dataLen = fixedSize - 1; + var buf = new byte[dataLen]; + var iData = 0; + for (var iBuf = buf.Length - 1; iBuf >= 0 && iData < data.Length; iBuf--, iData++) + { + buf[iBuf] = data[iData]; + } + stream.Write(buf, 0, buf.Length); + } + else + { + var dec = Convert.ToDecimal(value); + var sqlDec = (SqlDecimal)dec; + stream.WriteByte(sqlDec.IsPositive ? (byte)0 : (byte)1); + var data = sqlDec.BinData; + var dataLen = fixedSize - 1; + var buf = new byte[dataLen]; + var iData = 0; + for (var iBuf = buf.Length - 1; iBuf >= 0 && iData < data.Length; iBuf--, iData++) + { + buf[iBuf] = data[iData]; + } + stream.Write(buf, 0, buf.Length); + } + } + + private static void WriteShortDate(Stream stream, DateTime value) + { + var span = value - Constants.Sql.RegularDateTime.Epoch; + var days = (ushort)span.Days; + var minutes = (ushort)(span.Hours * 60 + span.Minutes); + stream.WriteUShort(days); + stream.WriteUShort(minutes); + } + + private static void WriteTimeValue(Stream stream, object value) + { + TimeSpan ts; + if (value is TimeSpan span) + { + ts = span; + } + else + { + ts = Convert.ToDateTime(value).TimeOfDay; + } + stream.WriteTime(ts); + } + + private static void WriteMoneyRaw(Stream stream, decimal value) + { + // Money is stored as 8 bytes: high 4 bytes then low 4 bytes of a scaled int64 + var scaled = Convert.ToInt64(decimal.Truncate(value * 10000m)); + var buf = BitConverter.GetBytes(scaled); + // Write high 4 bytes first, then low 4 bytes + stream.Write(buf, 4, 4); + stream.Write(buf, 0, 4); + } + + private static void WriteShortMoney(Stream stream, decimal value) + { + var scaled = Convert.ToInt32(decimal.Truncate(value * 10000m)); + stream.WriteInt(scaled); + } + + private static int WriteIntN(Stream stream, object value, int maxLength) + { + switch (maxLength) + { + case 1: + stream.WriteByte(Convert.ToByte(value)); + return 1; + case 2: + stream.WriteShort(Convert.ToInt16(value)); + return 2; + case 4: + stream.WriteInt(Convert.ToInt32(value)); + return 4; + case 8: + stream.WriteLong(Convert.ToInt64(value)); + return 8; + default: + stream.WriteInt(Convert.ToInt32(value)); + return 4; + } + } + + private static int WriteUIntN(Stream stream, object value, int maxLength) + { + switch (maxLength) + { + case 1: + stream.WriteByte(Convert.ToByte(value)); + return 1; + case 2: + stream.WriteUShort(Convert.ToUInt16(value)); + return 2; + case 4: + stream.WriteUInt(Convert.ToUInt32(value)); + return 4; + case 8: + stream.WriteULong(Convert.ToUInt64(value)); + return 8; + default: + stream.WriteUInt(Convert.ToUInt32(value)); + return 4; + } + } + + private static int WriteFloatN(Stream stream, object value, int maxLength) + { + if (maxLength == 4) + { + stream.WriteFloat(Convert.ToSingle(value)); + return 4; + } + stream.WriteDouble(Convert.ToDouble(value)); + return 8; + } + + private static int WriteDateTimeN(Stream stream, object value, int maxLength) + { + var dt = Convert.ToDateTime(value); + if (maxLength == 4) + { + WriteShortDate(stream, dt); + return 4; + } + stream.WriteIntPartDateTime(dt); + return 8; + } + + private static int WriteBigDateTimeN(Stream stream, object value) + { + var dt = Convert.ToDateTime(value); + var timeSinceEpoch = dt - Constants.Sql.BigDateTime.Epoch; + var msSinceEpoch = timeSinceEpoch.Ticks / TimeSpan.TicksPerMillisecond; + var usSinceEpoch = msSinceEpoch * 1000; + var usSinceYearZero = usSinceEpoch + Constants.Sql.BigDateTime.EpochMicroSeconds; + stream.WriteLong(usSinceYearZero); + return 8; + } + + private static int WriteDateN(Stream stream, object value) + { + stream.WriteDate(Convert.ToDateTime(value)); + return 4; + } + + private static int WriteTimeN(Stream stream, object value) + { + TimeSpan ts; + if (value is TimeSpan span) + { + ts = span; + } + else + { + ts = Convert.ToDateTime(value).TimeOfDay; + } + stream.WriteTime(ts); + return 4; + } + + private static int WriteMoneyN(Stream stream, object value, int maxLength) + { + var d = Convert.ToDecimal(value); + if (maxLength == 4) + { + WriteShortMoney(stream, d); + return 4; + } + WriteMoneyRaw(stream, d); + return 8; + } + + private static int WriteString(Stream stream, object value, int maxLength, Encoding encoding) + { + var str = Convert.ToString(value); + var bytes = encoding.GetBytes(str); + var len = Math.Min(bytes.Length, maxLength); + stream.Write(bytes, 0, len); + return len; + } + + private static int WriteLongString(Stream stream, object value, int maxLength, Encoding encoding) + { + var str = Convert.ToString(value); + var bytes = encoding.GetBytes(str); + var len = Math.Min(bytes.Length, maxLength); + stream.Write(bytes, 0, len); + return len; + } + + private static int WriteBinary(Stream stream, object value, int maxLength) + { + var bytes = (byte[])value; + var len = Math.Min(bytes.Length, maxLength); + stream.Write(bytes, 0, len); + return len; + } + + private static int WriteLongBinary(Stream stream, object value, BcpColumnMetadata column) + { + if (value is string s) + { + // unichar/univarchar types + if (column.UserType == 34 || column.UserType == 35) + { + var bytes = Encoding.Unicode.GetBytes(s); + var len = Math.Min(bytes.Length, column.Length); + stream.Write(bytes, 0, len); + return len; + } + } + + var ba = value as byte[]; + if (ba != null) + { + var len = Math.Min(ba.Length, column.Length); + stream.Write(ba, 0, len); + return len; + } + + return 0; + } + + private static int WriteDecimal(Stream stream, object value) + { + if (value is AseDecimal ad) + { + var requiredBytes = ad.BytesRequired; + stream.WriteByte(ad.IsPositive ? (byte)0 : (byte)1); + var data = ad.BinData; + var buf = new byte[requiredBytes]; + var iData = 0; + for (var iBuf = buf.Length - 1; iBuf >= 0; iBuf--, iData++) + { + buf[iBuf] = data[iData]; + } + stream.Write(buf, 0, buf.Length); + return 1 + requiredBytes; + } + else + { + var dec = Convert.ToDecimal(value); + var sqlDec = (SqlDecimal)dec; + var requiredBytes = sqlDec.BytesRequired; + stream.WriteByte(sqlDec.IsPositive ? (byte)0 : (byte)1); + var data = sqlDec.BinData; + var buf = new byte[requiredBytes]; + var iData = 0; + for (var iBuf = buf.Length - 1; iBuf >= 0; iBuf--, iData++) + { + buf[iBuf] = data[iData]; + } + stream.Write(buf, 0, buf.Length); + return 1 + requiredBytes; + } + } + } +} diff --git a/src/AdoNetCore.AseClient/Internal/InternalConnection.cs b/src/AdoNetCore.AseClient/Internal/InternalConnection.cs index 50194694..b6259c4e 100644 --- a/src/AdoNetCore.AseClient/Internal/InternalConnection.cs +++ b/src/AdoNetCore.AseClient/Internal/InternalConnection.cs @@ -11,6 +11,7 @@ using System.Threading.Tasks; using AdoNetCore.AseClient.Enum; using AdoNetCore.AseClient.Interface; +using AdoNetCore.AseClient.Internal.BulkCopy; using AdoNetCore.AseClient.Internal.Handler; using AdoNetCore.AseClient.Packet; using AdoNetCore.AseClient.Token; @@ -479,7 +480,19 @@ public DbDataReader ExecuteReader(CommandBehavior behavior, AseCommand command, public Task ExecuteReaderTaskRunnable(CommandBehavior behavior, AseCommand command, AseTransaction transaction) { var readerSource = new TaskCompletionSource(); - InternalExecuteQueryAsync(command, transaction, readerSource, behavior); + + // If SequentialAccess is requested, run on background thread for streaming support. + // This allows the reader to be returned as soon as first data arrives, while + // background thread continues receiving remaining data into the BlockingCollection. + if ((behavior & CommandBehavior.SequentialAccess) == CommandBehavior.SequentialAccess) + { + Task.Run(() => InternalExecuteQueryAsync(command, transaction, readerSource, behavior)); + } + else + { + InternalExecuteQueryAsync(command, transaction, readerSource, behavior); + } + return readerSource.Task; } @@ -697,6 +710,265 @@ private IToken[] BuildParameterTokens(AseCommand command) }; } + public void ExecuteBulkCopy( + string destinationTableName, + IDataReader reader, + AseBulkCopyColumnMappingCollection columnMappings, + int batchSize, + int notifyAfter, + Func onNotify, + AseTransaction transaction) + { + AssertExecutionStart(); + + try + { + // Step 1: Check if table uses datarows locking (affects row format padding) + var isDatarowsLocked = CheckDatarowsLocking(destinationTableName); + + // Step 2: Send "insert bulk " via normal language token + SendPacket(new NormalPacket(new LanguageToken + { + HasParameters = false, + CommandText = "insert bulk " + destinationTableName + })); + + // Step 3: Receive response - server sends column metadata as a result set + var envChangeTokenHandler = new EnvChangeTokenHandler(_environment, _parameters.Charset); + var messageHandler = new MessageTokenHandler(EventNotifier); + var dataReaderHandler = new DataReaderTokenHandler(); + var doneHandler = new DoneTokenHandler(); + + ReceiveTokens( + envChangeTokenHandler, + messageHandler, + dataReaderHandler, + doneHandler); + + messageHandler.AssertNoErrors(); + + // Step 4: Parse column metadata from the response + var serverColumns = ParseBulkColumnMetadata(dataReaderHandler); + + if (serverColumns.Length == 0) + { + throw new AseException("Server returned no column metadata for bulk insert."); + } + + // Step 5: Resolve column mappings + var sourceOrdinals = BcpRowFormatter.ResolveColumnMappings(serverColumns, reader, columnMappings); + + // Step 6: Create row formatter for native BCP format + var formatter = new BcpRowFormatter(serverColumns, isDatarowsLocked, _environment.Encoding, sourceOrdinals); + + // Step 7: Send rows in bulk mode + // TDS 5.0 BCP wire format (from FreeTDS): [2-byte row_size LE] [native row data] + // No TDS_ROW token prefix for TDS 5.0 BCP. + long totalRowsCopied = 0; + var currentBatchRows = 0; + var effectiveBatchSize = batchSize > 0 ? batchSize : int.MaxValue; + + lock (_sendMutex) + { +#if ENABLE_ARRAY_POOL + using (var tokenStream = new TokenSendStream(_networkStream, _environment, _arrayPool)) +#else + using (var tokenStream = new TokenSendStream(_networkStream, _environment)) +#endif + { + tokenStream.SetBufferType(BufferType.TDS_BUF_BULK, BufferStatus.TDS_BUFSTAT_NONE); + + while (reader.Read()) + { + var rowBytes = formatter.FormatRow(reader); + + // Write 2-byte row length prefix (LE) then native row data + var rowLenBytes = BitConverter.GetBytes((short)rowBytes.Length); + tokenStream.Write(rowLenBytes, 0, 2); + tokenStream.Write(rowBytes, 0, rowBytes.Length); + + currentBatchRows++; + totalRowsCopied++; + + // Check for batch completion + if (currentBatchRows >= effectiveBatchSize) + { + // Flush current batch + tokenStream.Flush(); + + // Receive DONE token for this batch + var batchDoneHandler = new DoneTokenHandler(); + var batchMsgHandler = new MessageTokenHandler(EventNotifier); + ReceiveTokens(batchMsgHandler, batchDoneHandler); + batchMsgHandler.AssertNoErrors(); + + if (transaction != null && batchDoneHandler.TransactionState == TranState.TDS_TRAN_ABORT) + { + transaction.MarkAborted(); + throw new AseException("Transaction was aborted during bulk copy."); + } + + currentBatchRows = 0; + + // Start new bulk stream for next batch + tokenStream.SetBufferType(BufferType.TDS_BUF_BULK, BufferStatus.TDS_BUFSTAT_NONE); + } + + // Fire notification + if (notifyAfter > 0 && totalRowsCopied % notifyAfter == 0) + { + if (onNotify != null && onNotify(totalRowsCopied)) + { + // User requested abort + break; + } + } + } + + // Flush to send all data (EOM flag in TDS packet signals end of bulk data) + tokenStream.Flush(); + } + } + + // Receive final DONE token + var finalDoneHandler = new DoneTokenHandler(); + var finalMsgHandler = new MessageTokenHandler(EventNotifier); + ReceiveTokens(finalMsgHandler, finalDoneHandler); + finalMsgHandler.AssertNoErrors(); + + if (transaction != null && finalDoneHandler.TransactionState == TranState.TDS_TRAN_ABORT) + { + transaction.MarkAborted(); + } + + AssertExecutionCompletion(finalDoneHandler); + } + catch (AseException) + { + IsDoomed = true; + AssertExecutionCompletion(); + throw; + } + catch (Exception ex) + { + IsDoomed = true; + AssertExecutionCompletion(); + throw new AseException("Bulk copy failed: " + ex.Message, ex); + } + } + + /// + /// Checks if the target table uses datarows locking (DOL) by querying sysstat2 from sysobjects. + /// DOL-locked tables require 4-byte padding at the start of each BCP row. + /// + private bool CheckDatarowsLocking(string tableName) + { + // Temporarily release execution state to execute the probe query + TrySetState(InternalConnectionState.Ready, s => s == InternalConnectionState.Active); + + try + { + AssertExecutionStart(); + + // Query sysobjects to check the locking scheme via sysstat2 bit 0x1000 + // sysstat2 & 0x1000 != 0 means DOL (datarows-locked) + var probeSql = $"SELECT sysstat2 FROM sysobjects WHERE name = '{tableName.Replace("'", "''")}'"; + + SendPacket(new NormalPacket(new LanguageToken + { + HasParameters = false, + CommandText = probeSql + })); + + var envChange = new EnvChangeTokenHandler(_environment, _parameters.Charset); + var msgHandler = new MessageTokenHandler(EventNotifier); + var readerHandler = new DataReaderTokenHandler(); + var done = new DoneTokenHandler(); + + ReceiveTokens(envChange, msgHandler, readerHandler, done); + + // Don't throw on error - just default to false (allpages locking) + foreach (var result in readerHandler.Results()) + { + if (result.Rows.Count > 0 && result.Rows[0].Items.Length > 0) + { + var sysstat2 = Convert.ToInt32(result.Rows[0].Items[0]); + return (sysstat2 & 0x1000) != 0; + } + } + + return false; + } + catch + { + // If we can't determine locking, default to allpages (no DOL padding) + return false; + } + finally + { + // Restore Active state for the bulk copy operation + TrySetState(InternalConnectionState.Ready, s => s == InternalConnectionState.Active); + AssertExecutionStart(); + } + } + + /// + /// Parses column metadata from the server's INSERT BULK response. + /// The server returns a result set where each ROW describes one table column: + /// [0] minlen (INT2), [1] maxlen (INT2), [2] colcnt (INT2), + /// [3] name (VARCHAR), [4] colid (INT2), [5] type (INT1), + /// [6] length (INT4), [7] status (INT1), [8] offset (INT2), + /// [9] dflt (INT1), [10] prec (INT1), [11] scale (INT1) + /// + private static BcpColumnMetadata[] ParseBulkColumnMetadata(DataReaderTokenHandler readerHandler) + { + var columns = new List(); + + foreach (var result in readerHandler.Results()) + { + for (var i = 0; i < result.Rows.Count; i++) + { + var row = result.Rows[i].Items; + + var name = Convert.ToString(row[3])?.Trim() ?? string.Empty; + var tdsType = (TdsDataType)Convert.ToByte(row[5]); + var length = Convert.ToInt32(row[6]); + var status = Convert.ToByte(row[7]); + var offset = Convert.ToInt16(row[8]); + var prec = Convert.ToByte(row[10]); + var scale = Convert.ToByte(row[11]); + + // Map status bits to RowFormatItemStatus + // In the INSERT BULK metadata: + // 0x08 = nullable + // 0x10 = fixed-offset column (NOT identity) + var rowStatus = RowFormatItemStatus.TDS_ROW_UPDATABLE; + if ((status & 0x08) != 0) + { + rowStatus |= RowFormatItemStatus.TDS_ROW_NULLALLOWED; + } + + // Positive offset = fixed position in row; negative = variable column + var isFixed = offset >= 0; + + columns.Add(new BcpColumnMetadata + { + Ordinal = i, + Name = name, + DataType = tdsType, + Length = length, + IsFixedLength = isFixed, + Precision = prec, + Scale = scale, + UserType = 0, + RowStatus = rowStatus + }); + } + } + + return columns.ToArray(); + } + public void Dispose() { if (IsDisposed) diff --git a/src/AdoNetCore.AseClient/Token/ClientCapabilityToken.cs b/src/AdoNetCore.AseClient/Token/ClientCapabilityToken.cs index e9a763e3..968ccd28 100644 --- a/src/AdoNetCore.AseClient/Token/ClientCapabilityToken.cs +++ b/src/AdoNetCore.AseClient/Token/ClientCapabilityToken.cs @@ -33,7 +33,7 @@ public ClientCapabilityToken(bool enableServerPacketSize = true) ((byte)(Request10.DATA_DATETIMEN | Request10.DATA_INTN | Request10.DATA_LBIN | Request10.DATA_LCHAR | Request10.DATA_DEC | Request10.DATA_IMAGE | Request10.DATA_TEXT | Request10.DATA_NUM)), ((byte)(Request11.DATA_FLT8 | Request11.DATA_FLT4 | Request11.DATA_DATE4 | Request11.DATA_DATE8 | Request11.DATA_MNY4 | Request11.DATA_MNY8 | Request11.DATA_VBIN | Request11.DATA_BIN)), ((byte)(Request12.DATA_VCHAR | Request12.DATA_CHAR | Request12.DATA_BIT | Request12.DATA_INT4 | Request12.DATA_INT2 | Request12.DATA_INT1 | Request12.REQ_PARAM | Request12.REQ_MSG)), - ((byte)(Request13.REQ_DYNF /*| Request13.REQ_CURSOR | Request13.REQ_BCP*/ | Request13.REQ_MSTMT/* | Request13.REQ_EVT*/ | Request13.REQ_RPC | Request13.REQ_LANG/* | Request13.NONE*/)), + ((byte)(Request13.REQ_DYNF /*| Request13.REQ_CURSOR*/ | Request13.REQ_BCP | Request13.REQ_MSTMT/* | Request13.REQ_EVT*/ | Request13.REQ_RPC | Request13.REQ_LANG/* | Request13.NONE*/)), //cap response ((byte)TDS_CAPABILITY_TYPE.TDS_CAP_RESPONSE), diff --git a/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/AseBulkCopyColumnMappingTests.cs b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/AseBulkCopyColumnMappingTests.cs new file mode 100644 index 00000000..9000500f --- /dev/null +++ b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/AseBulkCopyColumnMappingTests.cs @@ -0,0 +1,170 @@ +using NUnit.Framework; + +namespace AdoNetCore.AseClient.Tests.Unit.BulkCopy +{ + [TestFixture] + public class AseBulkCopyColumnMappingTests + { + #region Constructor Tests + + [Test] + public void Constructor_ByOrdinals_SetsProperties() + { + var mapping = new AseBulkCopyColumnMapping(0, 1); + + Assert.AreEqual(0, mapping.SourceOrdinal); + Assert.AreEqual(1, mapping.DestinationOrdinal); + Assert.IsNull(mapping.SourceColumn); + Assert.IsNull(mapping.DestinationColumn); + } + + [Test] + public void Constructor_ByNames_SetsProperties() + { + var mapping = new AseBulkCopyColumnMapping("SourceCol", "DestCol"); + + Assert.AreEqual("SourceCol", mapping.SourceColumn); + Assert.AreEqual("DestCol", mapping.DestinationColumn); + Assert.AreEqual(-1, mapping.SourceOrdinal); + Assert.AreEqual(-1, mapping.DestinationOrdinal); + } + + [Test] + public void Constructor_NameToOrdinal_SetsProperties() + { + var mapping = new AseBulkCopyColumnMapping("SourceCol", 2); + + Assert.AreEqual("SourceCol", mapping.SourceColumn); + Assert.AreEqual(2, mapping.DestinationOrdinal); + Assert.AreEqual(-1, mapping.SourceOrdinal); + Assert.IsNull(mapping.DestinationColumn); + } + + [Test] + public void Constructor_OrdinalToName_SetsProperties() + { + var mapping = new AseBulkCopyColumnMapping(3, "DestCol"); + + Assert.AreEqual(3, mapping.SourceOrdinal); + Assert.AreEqual("DestCol", mapping.DestinationColumn); + Assert.IsNull(mapping.SourceColumn); + Assert.AreEqual(-1, mapping.DestinationOrdinal); + } + + #endregion + + #region Collection Tests + + [Test] + public void Collection_AddByNames_ReturnsMapping() + { + var collection = new AseBulkCopyColumnMappingCollection(); + + var mapping = collection.Add("src", "dest"); + + Assert.AreEqual(1, collection.Count); + Assert.AreEqual("src", mapping.SourceColumn); + Assert.AreEqual("dest", mapping.DestinationColumn); + } + + [Test] + public void Collection_AddByOrdinals_ReturnsMapping() + { + var collection = new AseBulkCopyColumnMappingCollection(); + + var mapping = collection.Add(0, 1); + + Assert.AreEqual(1, collection.Count); + Assert.AreEqual(0, mapping.SourceOrdinal); + Assert.AreEqual(1, mapping.DestinationOrdinal); + } + + [Test] + public void Collection_AddMultiple_HasCorrectCount() + { + var collection = new AseBulkCopyColumnMappingCollection(); + + collection.Add("a", "x"); + collection.Add("b", "y"); + collection.Add("c", "z"); + + Assert.AreEqual(3, collection.Count); + } + + [Test] + public void Collection_Clear_RemovesAll() + { + var collection = new AseBulkCopyColumnMappingCollection(); + collection.Add("a", "x"); + collection.Add("b", "y"); + + collection.Clear(); + + Assert.AreEqual(0, collection.Count); + } + + [Test] + public void Collection_Indexer_ReturnsCorrectMapping() + { + var collection = new AseBulkCopyColumnMappingCollection(); + collection.Add("a", "x"); + collection.Add("b", "y"); + + Assert.AreEqual("b", collection[1].SourceColumn); + } + + [Test] + public void Collection_RemoveAt_RemovesCorrectItem() + { + var collection = new AseBulkCopyColumnMappingCollection(); + collection.Add("a", "x"); + collection.Add("b", "y"); + collection.Add("c", "z"); + + collection.RemoveAt(1); + + Assert.AreEqual(2, collection.Count); + Assert.AreEqual("a", collection[0].SourceColumn); + Assert.AreEqual("c", collection[1].SourceColumn); + } + + [Test] + public void Collection_Enumerable_IteratesAll() + { + var collection = new AseBulkCopyColumnMappingCollection(); + collection.Add("a", "x"); + collection.Add("b", "y"); + + var count = 0; + foreach (var mapping in collection) + { + count++; + } + Assert.AreEqual(2, count); + } + + [Test] + public void Collection_AddNameToOrdinal() + { + var collection = new AseBulkCopyColumnMappingCollection(); + + var mapping = collection.Add("src", 5); + + Assert.AreEqual("src", mapping.SourceColumn); + Assert.AreEqual(5, mapping.DestinationOrdinal); + } + + [Test] + public void Collection_AddOrdinalToName() + { + var collection = new AseBulkCopyColumnMappingCollection(); + + var mapping = collection.Add(3, "dest"); + + Assert.AreEqual(3, mapping.SourceOrdinal); + Assert.AreEqual("dest", mapping.DestinationColumn); + } + + #endregion + } +} diff --git a/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpRowFormatterTests.cs b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpRowFormatterTests.cs new file mode 100644 index 00000000..6c723f00 --- /dev/null +++ b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpRowFormatterTests.cs @@ -0,0 +1,528 @@ +#if ENABLE_SYSTEM_DATA_COMMON_EXTENSIONS +using System; +using System.Collections.Generic; +using System.Data; +using System.IO; +using System.Text; +using AdoNetCore.AseClient.Enum; +using AdoNetCore.AseClient.Internal.BulkCopy; +using NUnit.Framework; + +namespace AdoNetCore.AseClient.Tests.Unit.BulkCopy +{ + [TestFixture] + public class BcpRowFormatterTests + { + #region Helper Methods + + private static BcpColumnMetadata MakeCol(int ordinal, string name, TdsDataType type, int length = 0, + RowFormatItemStatus status = 0, bool isFixed = false) + { + return new BcpColumnMetadata + { + Ordinal = ordinal, + Name = name, + DataType = type, + Length = length, + RowStatus = status, + IsFixedLength = isFixed + }; + } + + private static DataTable CreateDataTable(params DataColumn[] columns) + { + var table = new DataTable(); + foreach (var col in columns) + { + table.Columns.Add(col); + } + return table; + } + + #endregion + + #region Basic Row Format + + [Test] + public void FormatRow_AllFixedColumns_CorrectLayout() + { + // Two fixed columns: INT4, INT2 + var columns = new[] + { + MakeCol(0, "col1", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "col2", TdsDataType.TDS_INT2, isFixed: true) + }; + + var sourceOrdinals = new[] { 0, 1 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable( + new DataColumn("col1", typeof(int)), + new DataColumn("col2", typeof(short))); + dt.Rows.Add(42, (short)7); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Expected: [varCount=0] [rowNum=0] [INT4 data] [INT2 data] + // No row size field or offset table since no variable columns + Assert.AreEqual(8, row.Length); + using (var ms = new MemoryStream(row)) + { + Assert.AreEqual(0, ms.ReadByte()); // varCount = 0 + Assert.AreEqual(0, ms.ReadByte()); // rowNum = 0 + + var intBuf = new byte[4]; + ms.Read(intBuf, 0, 4); + Assert.AreEqual(42, BitConverter.ToInt32(intBuf, 0)); + + var shortBuf = new byte[2]; + ms.Read(shortBuf, 0, 2); + Assert.AreEqual(7, BitConverter.ToInt16(shortBuf, 0)); + + Assert.AreEqual(ms.Length, ms.Position); // No more data + } + } + + [Test] + public void FormatRow_AllVariableColumns_HasOffsetTable() + { + // Two variable columns: VARCHAR(50), INTN(4) + var columns = new[] + { + MakeCol(0, "name", TdsDataType.TDS_VARCHAR, 50), + MakeCol(1, "age", TdsDataType.TDS_INTN, 4) + }; + + var sourceOrdinals = new[] { 0, 1 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable( + new DataColumn("name", typeof(string)), + new DataColumn("age", typeof(int))); + dt.Rows.Add("Bob", 25); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Native BCP format: + // [varCount=2][rowNum=0] (pos 0-1) + // [2-byte row_size LE] (pos 2-3) + // ["Bob" = 3 bytes] (pos 4-6) + // [INT4=25 = 4 bytes] (pos 7-10) + // [offset table] (pos 11-14) + // offsets: [0]=4, [1]=7, [2]=11 + // table: [count=3] [11] [7] [4] + // Total = 15 bytes, row_size = 15 + Assert.AreEqual(15, row.Length); + using (var ms = new MemoryStream(row)) + { + Assert.AreEqual(2, ms.ReadByte()); // varCount = 2 + Assert.AreEqual(0, ms.ReadByte()); // rowNum = 0 + + // 2-byte row size + var szBuf = new byte[2]; + ms.Read(szBuf, 0, 2); + Assert.AreEqual(15, BitConverter.ToUInt16(szBuf, 0)); + + // Variable data: "Bob" + var nameBuf = new byte[3]; + ms.Read(nameBuf, 0, 3); + Assert.AreEqual("Bob", Encoding.ASCII.GetString(nameBuf)); + + // Variable data: INT4 = 25 + var intBuf = new byte[4]; + ms.Read(intBuf, 0, 4); + Assert.AreEqual(25, BitConverter.ToInt32(intBuf, 0)); + + // Offset table: [count=3] [off2=11] [off1=7] [off0=4] + Assert.AreEqual(3, ms.ReadByte()); // count (ncols+1) + Assert.AreEqual(11, ms.ReadByte()); // offsets[2] (end of age) + Assert.AreEqual(7, ms.ReadByte()); // offsets[1] (end of name) + Assert.AreEqual(4, ms.ReadByte()); // offsets[0] (start of var data) + } + } + + [Test] + public void FormatRow_MixedFixedAndVariable() + { + // col0: INT4 (fixed), col1: VARCHAR(50) (variable) + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50) + }; + + var sourceOrdinals = new[] { 0, 1 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable( + new DataColumn("id", typeof(int)), + new DataColumn("name", typeof(string))); + dt.Rows.Add(1, "Alice"); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Format: + // [varCount=1][rowNum=0] (pos 0-1) + // [INT4=1] (pos 2-5, fixed) + // [2-byte row_size LE] (pos 6-7) + // ["Alice" = 5 bytes] (pos 8-12) + // [offset table] (pos 13-15) + // offsets: [0]=8, [1]=13 + // table: [count=2] [13] [8] + // Total = 16, row_size = 16 + Assert.AreEqual(16, row.Length); + using (var ms = new MemoryStream(row)) + { + Assert.AreEqual(1, ms.ReadByte()); // varCount = 1 + Assert.AreEqual(0, ms.ReadByte()); // rowNum = 0 + + // Fixed column: INT4 = 1 + var intBuf = new byte[4]; + ms.Read(intBuf, 0, 4); + Assert.AreEqual(1, BitConverter.ToInt32(intBuf, 0)); + + // 2-byte row size + var szBuf = new byte[2]; + ms.Read(szBuf, 0, 2); + Assert.AreEqual(16, BitConverter.ToUInt16(szBuf, 0)); + + // Variable column: "Alice" = 5 bytes + var strBuf = new byte[5]; + ms.Read(strBuf, 0, 5); + Assert.AreEqual("Alice", Encoding.ASCII.GetString(strBuf)); + + // Offset table: [count=2] [off1=13] [off0=8] + Assert.AreEqual(2, ms.ReadByte()); // count (ncols+1) + Assert.AreEqual(13, ms.ReadByte()); // offsets[1] (end of "Alice") + Assert.AreEqual(8, ms.ReadByte()); // offsets[0] (start of var data) + } + } + + #endregion + + #region DOL Padding + + [Test] + public void FormatRow_DatarowsLocked_Has4BytePadding() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true) + }; + + var sourceOrdinals = new[] { 0 }; + var formatter = new BcpRowFormatter(columns, true, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable(new DataColumn("id", typeof(int))); + dt.Rows.Add(99); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Expected: [0,0,0,0] (DOL padding) [varCount=0] [rowNum=0] [INT4 data] + Assert.AreEqual(10, row.Length); + Assert.AreEqual(0, row[0]); + Assert.AreEqual(0, row[1]); + Assert.AreEqual(0, row[2]); + Assert.AreEqual(0, row[3]); + Assert.AreEqual(0, row[4]); // varCount + Assert.AreEqual(0, row[5]); // rowNum + Assert.AreEqual(99, BitConverter.ToInt32(row, 6)); + } + + [Test] + public void FormatRow_NotDatarowsLocked_NoPadding() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true) + }; + + var sourceOrdinals = new[] { 0 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable(new DataColumn("id", typeof(int))); + dt.Rows.Add(99); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Expected: [varCount=0] [rowNum=0] [INT4 data] - total 6 bytes + Assert.AreEqual(6, row.Length); + Assert.AreEqual(0, row[0]); // varCount + Assert.AreEqual(0, row[1]); // rowNum + Assert.AreEqual(99, BitConverter.ToInt32(row, 2)); + } + + #endregion + + #region Null Handling + + [Test] + public void FormatRow_NullFixedColumn_ZeroFilled() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true, + status: RowFormatItemStatus.TDS_ROW_NULLALLOWED) + }; + + var sourceOrdinals = new[] { 0 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable(new DataColumn("id", typeof(object))); + dt.Rows.Add(DBNull.Value); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Fixed null: zero-filled bytes + Assert.AreEqual(6, row.Length); + Assert.AreEqual(0, BitConverter.ToInt32(row, 2)); + } + + [Test] + public void FormatRow_NullVariableColumn_SameOffset() + { + // Two variable columns, first is null + var columns = new[] + { + MakeCol(0, "name", TdsDataType.TDS_VARCHAR, 50, + RowFormatItemStatus.TDS_ROW_NULLALLOWED), + MakeCol(1, "age", TdsDataType.TDS_INTN, 4) + }; + + var sourceOrdinals = new[] { 0, 1 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var dt = CreateDataTable( + new DataColumn("name", typeof(object)), + new DataColumn("age", typeof(int))); + dt.Rows.Add(DBNull.Value, 30); + var reader = dt.CreateDataReader(); + reader.Read(); + + var row = formatter.FormatRow(reader); + + // Format: + // [varCount=2][rowNum=0] (pos 0-1) + // [2-byte row_size LE] (pos 2-3) + // [0 bytes for null] (nothing) + // [INT4=30] (pos 4-7) + // [offset table] (pos 8-11) + // offsets: [0]=4, [1]=4 (null=same), [2]=8 + // table: [count=3] [8] [4] [4] + // Total = 12, row_size = 12 + Assert.AreEqual(12, row.Length); + using (var ms = new MemoryStream(row)) + { + Assert.AreEqual(2, ms.ReadByte()); // varCount = 2 + Assert.AreEqual(0, ms.ReadByte()); // rowNum = 0 + + // 2-byte row size + var szBuf = new byte[2]; + ms.Read(szBuf, 0, 2); + Assert.AreEqual(12, BitConverter.ToUInt16(szBuf, 0)); + + // Variable data: null writes 0 bytes, INT4 writes 4 bytes + var intBuf = new byte[4]; + ms.Read(intBuf, 0, 4); + Assert.AreEqual(30, BitConverter.ToInt32(intBuf, 0)); + + // Offset table: [count=3] [off2=8] [off1=4] [off0=4] + Assert.AreEqual(3, ms.ReadByte()); // count (ncols+1) + Assert.AreEqual(8, ms.ReadByte()); // offsets[2] (end of age) + Assert.AreEqual(4, ms.ReadByte()); // offsets[1] (null = same as offsets[0]) + Assert.AreEqual(4, ms.ReadByte()); // offsets[0] (start of var data) + } + } + + #endregion + + #region Column Mapping Resolution + + [Test] + public void ResolveColumnMappings_NoMappings_MapsByOrdinal() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50), + MakeCol(2, "value", TdsDataType.TDS_FLT8, isFixed: true) + }; + + var dt = CreateDataTable( + new DataColumn("id", typeof(int)), + new DataColumn("name", typeof(string)), + new DataColumn("value", typeof(double))); + dt.Rows.Add(1, "test", 3.14); + var reader = dt.CreateDataReader(); + + var result = BcpRowFormatter.ResolveColumnMappings(columns, reader, null); + + Assert.AreEqual(3, result.Length); + Assert.AreEqual(0, result[0]); + Assert.AreEqual(1, result[1]); + Assert.AreEqual(2, result[2]); + } + + [Test] + public void ResolveColumnMappings_SkipsIdentityColumns() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true, + status: RowFormatItemStatus.TDS_ROW_IDENTITY), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50) + }; + + var dt = CreateDataTable( + new DataColumn("name", typeof(string))); + dt.Rows.Add("test"); + var reader = dt.CreateDataReader(); + + var result = BcpRowFormatter.ResolveColumnMappings(columns, reader, null); + + Assert.AreEqual(2, result.Length); + Assert.AreEqual(-1, result[0]); // identity skipped + Assert.AreEqual(0, result[1]); // maps to reader col 0 + } + + [Test] + public void ResolveColumnMappings_ExplicitMappings_ByName() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50) + }; + + var dt = CreateDataTable( + new DataColumn("src_name", typeof(string)), + new DataColumn("src_id", typeof(int))); + dt.Rows.Add("test", 1); + var reader = dt.CreateDataReader(); + + var mappings = new AseBulkCopyColumnMappingCollection(); + mappings.Add("src_id", "id"); + mappings.Add("src_name", "name"); + + var result = BcpRowFormatter.ResolveColumnMappings(columns, reader, mappings); + + Assert.AreEqual(2, result.Length); + Assert.AreEqual(1, result[0]); // "id" -> reader col 1 (src_id) + Assert.AreEqual(0, result[1]); // "name" -> reader col 0 (src_name) + } + + [Test] + public void ResolveColumnMappings_ExplicitMappings_ByOrdinal() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50) + }; + + var dt = CreateDataTable( + new DataColumn("a", typeof(string)), + new DataColumn("b", typeof(int))); + dt.Rows.Add("test", 1); + var reader = dt.CreateDataReader(); + + var mappings = new AseBulkCopyColumnMappingCollection(); + mappings.Add(1, 0); // reader col 1 -> server col 0 + mappings.Add(0, 1); // reader col 0 -> server col 1 + + var result = BcpRowFormatter.ResolveColumnMappings(columns, reader, mappings); + + Assert.AreEqual(2, result.Length); + Assert.AreEqual(1, result[0]); + Assert.AreEqual(0, result[1]); + } + + [Test] + public void ResolveColumnMappings_UnmappedColumns_GetMinusOne() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50), + MakeCol(2, "extra", TdsDataType.TDS_INT4, isFixed: true) + }; + + var dt = CreateDataTable( + new DataColumn("src_name", typeof(string))); + dt.Rows.Add("test"); + var reader = dt.CreateDataReader(); + + var mappings = new AseBulkCopyColumnMappingCollection(); + mappings.Add("src_name", "name"); + + var result = BcpRowFormatter.ResolveColumnMappings(columns, reader, mappings); + + Assert.AreEqual(-1, result[0]); // unmapped + Assert.AreEqual(0, result[1]); // mapped + Assert.AreEqual(-1, result[2]); // unmapped + } + + #endregion + + #region FormatRow with object[] overload + + [Test] + public void FormatRow_ObjectArray_Works() + { + var columns = new[] + { + MakeCol(0, "id", TdsDataType.TDS_INT4, isFixed: true), + MakeCol(1, "name", TdsDataType.TDS_VARCHAR, 50) + }; + + var sourceOrdinals = new[] { 0, 1 }; + var formatter = new BcpRowFormatter(columns, false, Encoding.ASCII, sourceOrdinals); + + var values = new object[] { 42, "Hello" }; + var row = formatter.FormatRow(values); + + // Same format as MixedFixedAndVariable + Assert.AreEqual(16, row.Length); + using (var ms = new MemoryStream(row)) + { + Assert.AreEqual(1, ms.ReadByte()); // varCount = 1 + Assert.AreEqual(0, ms.ReadByte()); // rowNum = 0 + + // Fixed: INT4 = 42 + var intBuf = new byte[4]; + ms.Read(intBuf, 0, 4); + Assert.AreEqual(42, BitConverter.ToInt32(intBuf, 0)); + + // 2-byte row size + var szBuf = new byte[2]; + ms.Read(szBuf, 0, 2); + Assert.AreEqual(16, BitConverter.ToUInt16(szBuf, 0)); + + // Variable: "Hello" = 5 bytes + var strBuf = new byte[5]; + ms.Read(strBuf, 0, 5); + Assert.AreEqual("Hello", Encoding.ASCII.GetString(strBuf)); + + // Offset table: [count=2] [off1=13] [off0=8] + Assert.AreEqual(2, ms.ReadByte()); // count + Assert.AreEqual(13, ms.ReadByte()); // offsets[1] + Assert.AreEqual(8, ms.ReadByte()); // offsets[0] + } + } + + #endregion + } +} +#endif diff --git a/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpValueWriterTests.cs b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpValueWriterTests.cs new file mode 100644 index 00000000..a7aa7670 --- /dev/null +++ b/test/AdoNetCore.AseClient.Tests/Unit/BulkCopy/BcpValueWriterTests.cs @@ -0,0 +1,274 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using AdoNetCore.AseClient.Enum; +using AdoNetCore.AseClient.Internal.BulkCopy; +using NUnit.Framework; + +namespace AdoNetCore.AseClient.Tests.Unit.BulkCopy +{ + [TestFixture] + public class BcpValueWriterTests + { + private static BcpColumnMetadata MakeFixed(TdsDataType type) + { + return new BcpColumnMetadata { DataType = type }; + } + + private static BcpColumnMetadata MakeVariable(TdsDataType type, int length) + { + return new BcpColumnMetadata { DataType = type, Length = length }; + } + + #region Fixed-Length Types + + [Test] + public void WriteFixedValue_Bit_True() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, true, MakeFixed(TdsDataType.TDS_BIT), Encoding.ASCII); + Assert.AreEqual(new byte[] { 1 }, ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Bit_False() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, false, MakeFixed(TdsDataType.TDS_BIT), Encoding.ASCII); + Assert.AreEqual(new byte[] { 0 }, ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Int1() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, (byte)42, MakeFixed(TdsDataType.TDS_INT1), Encoding.ASCII); + Assert.AreEqual(new byte[] { 42 }, ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Int2() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, (short)1234, MakeFixed(TdsDataType.TDS_INT2), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes((short)1234), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Int4() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, 123456, MakeFixed(TdsDataType.TDS_INT4), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes(123456), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Int8() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, 123456789L, MakeFixed(TdsDataType.TDS_INT8), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes(123456789L), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Flt4() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, 3.14f, MakeFixed(TdsDataType.TDS_FLT4), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes(3.14f), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Flt8() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, 3.14159265, MakeFixed(TdsDataType.TDS_FLT8), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes(3.14159265), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Null_ZeroFills() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, DBNull.Value, MakeFixed(TdsDataType.TDS_INT4), Encoding.ASCII); + Assert.AreEqual(new byte[] { 0, 0, 0, 0 }, ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_Null_Int8_ZeroFills() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, null, MakeFixed(TdsDataType.TDS_INT8), Encoding.ASCII); + Assert.AreEqual(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }, ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_UInt2() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, (ushort)65535, MakeFixed(TdsDataType.TDS_UINT2), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes((ushort)65535), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_UInt4() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, 4294967295u, MakeFixed(TdsDataType.TDS_UINT4), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes(4294967295u), ms.ToArray()); + } + } + + [Test] + public void WriteFixedValue_UInt8() + { + using (var ms = new MemoryStream()) + { + BcpValueWriter.WriteFixedValue(ms, (ulong)18446744073709551615, MakeFixed(TdsDataType.TDS_UINT8), Encoding.ASCII); + Assert.AreEqual(BitConverter.GetBytes((ulong)18446744073709551615), ms.ToArray()); + } + } + + #endregion + + #region Variable-Length Types + + [Test] + public void WriteVariableValue_Varchar() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, "hello", MakeVariable(TdsDataType.TDS_VARCHAR, 255), Encoding.ASCII); + Assert.AreEqual(5, len); + Assert.AreEqual(Encoding.ASCII.GetBytes("hello"), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_Varchar_Truncates() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, "hello world", MakeVariable(TdsDataType.TDS_VARCHAR, 5), Encoding.ASCII); + Assert.AreEqual(5, len); + Assert.AreEqual(Encoding.ASCII.GetBytes("hello"), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_Null_ReturnsZero() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, DBNull.Value, MakeVariable(TdsDataType.TDS_VARCHAR, 255), Encoding.ASCII); + Assert.AreEqual(0, len); + Assert.AreEqual(0, ms.Length); + } + } + + [Test] + public void WriteVariableValue_Binary() + { + var data = new byte[] { 0x01, 0x02, 0x03 }; + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, data, MakeVariable(TdsDataType.TDS_BINARY, 10), Encoding.ASCII); + Assert.AreEqual(3, len); + Assert.AreEqual(data, ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_IntN_4Byte() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, 42, MakeVariable(TdsDataType.TDS_INTN, 4), Encoding.ASCII); + Assert.AreEqual(4, len); + Assert.AreEqual(BitConverter.GetBytes(42), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_IntN_8Byte() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, 42L, MakeVariable(TdsDataType.TDS_INTN, 8), Encoding.ASCII); + Assert.AreEqual(8, len); + Assert.AreEqual(BitConverter.GetBytes(42L), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_FloatN_4Byte() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, 3.14f, MakeVariable(TdsDataType.TDS_FLTN, 4), Encoding.ASCII); + Assert.AreEqual(4, len); + Assert.AreEqual(BitConverter.GetBytes(3.14f), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_FloatN_8Byte() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, 3.14, MakeVariable(TdsDataType.TDS_FLTN, 8), Encoding.ASCII); + Assert.AreEqual(8, len); + Assert.AreEqual(BitConverter.GetBytes(3.14), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_LongChar() + { + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, "test string", MakeVariable(TdsDataType.TDS_LONGCHAR, 1000), Encoding.ASCII); + Assert.AreEqual(11, len); + Assert.AreEqual(Encoding.ASCII.GetBytes("test string"), ms.ToArray()); + } + } + + [Test] + public void WriteVariableValue_VarBinary() + { + var data = new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }; + using (var ms = new MemoryStream()) + { + var len = BcpValueWriter.WriteVariableValue(ms, data, MakeVariable(TdsDataType.TDS_VARBINARY, 100), Encoding.ASCII); + Assert.AreEqual(4, len); + Assert.AreEqual(data, ms.ToArray()); + } + } + + #endregion + } +} diff --git a/test/AdoNetCore.AseClient.Tests/Unit/ConnectionPoolTests.cs b/test/AdoNetCore.AseClient.Tests/Unit/ConnectionPoolTests.cs index 9ca9081a..b38305cf 100644 --- a/test/AdoNetCore.AseClient.Tests/Unit/ConnectionPoolTests.cs +++ b/test/AdoNetCore.AseClient.Tests/Unit/ConnectionPoolTests.cs @@ -349,6 +349,15 @@ public IDictionary RetrieveStatistics() return new Dictionary(); } public IInfoMessageEventNotifier EventNotifier { get; set; } + + public void ExecuteBulkCopy( + string destinationTableName, + IDataReader reader, + AseBulkCopyColumnMappingCollection columnMappings, + int batchSize, + int notifyAfter, + Func onNotify, + AseTransaction transaction) { } } private class SlowConnectionFactory : IInternalConnectionFactory