@@ -233,10 +233,13 @@ private void syncCommand() {
233233 writeReadyForQueryMessage ();
234234 }
235235
236- private void flushCommand () {
236+ private void flushCommand () throws IOException {
237237 if (DEBUG )
238238 LogManager .instance ().log (this , Level .INFO , "PSQL: flush (thread=%s)" , Thread .currentThread ().threadId ());
239- writeReadyForQueryMessage ();
239+ // Flush message does NOT generate any response according to PostgreSQL protocol.
240+ // It just forces the backend to deliver any data pending in its output buffers.
241+ // See: https://www.postgresql.org/docs/current/protocol-message-formats.html
242+ channel .flush ();
240243 }
241244
242245 private void closeCommand () throws IOException {
@@ -283,14 +286,32 @@ private void describeCommand() throws IOException {
283286 portal .cachedResultSet = browseAndCacheResultSet (resultSet , 0 );
284287 portal .columns = getColumns (portal .cachedResultSet );
285288 writeRowDescription (portal .columns );
289+ portal .rowDescriptionSent = true ;
286290 } else
287291 writeNoData ();
288292 } else {
289- if (portal .columns != null )
293+ if (portal .columns != null ) {
290294 writeRowDescription (portal .columns );
295+ portal .rowDescriptionSent = true ;
296+ }
291297 }
292298 } else if (type == 'S' ) {
293- writeNoData ();
299+ // Describe Statement: send ParameterDescription followed by RowDescription/NoData
300+ // This tells the client how many parameters the prepared statement expects
301+ writeParameterDescription (portal );
302+
303+ // Now send RowDescription or NoData
304+ // For SELECT queries, we need to determine the columns from the type schema
305+ if (portal .isExpectingResult && portal .columns == null ) {
306+ portal .columns = getColumnsFromQuerySchema (portal .query );
307+ }
308+
309+ if (portal .columns != null && !portal .columns .isEmpty ()) {
310+ writeRowDescription (portal .columns );
311+ portal .rowDescriptionSent = true ;
312+ } else {
313+ writeNoData ();
314+ }
294315 } else
295316 throw new PostgresProtocolException ("Unexpected describe type '" + type + "'" );
296317 }
@@ -328,23 +349,40 @@ private void executeCommand() {
328349 portal .executed = true ;
329350 if (portal .isExpectingResult ) {
330351 portal .cachedResultSet = browseAndCacheResultSet (resultSet , limit );
331- portal .columns = getColumns (portal .cachedResultSet );
332- writeRowDescription (portal .columns );
352+ // Only send RowDescription if not already sent during DESCRIBE
353+ // But always use columns from actual result for DataRows consistency
354+ if (!portal .rowDescriptionSent ) {
355+ portal .columns = getColumns (portal .cachedResultSet );
356+ writeRowDescription (portal .columns );
357+ portal .rowDescriptionSent = true ;
358+ }
333359 }
334360 }
335361
336362 if (portal .isExpectingResult ) {
337- if (portal .columns == null )
338- portal .columns = getColumns (portal .cachedResultSet );
363+ // Always use columns from actual result to ensure DataRows match
364+ final Map <String , PostgresType > dataRowColumns = getColumns (portal .cachedResultSet );
365+
366+ // Verify column count matches what was sent in RowDescription
367+ if (portal .columns != null && portal .columns .size () != dataRowColumns .size ()) {
368+ // Column count mismatch - use the original columns from DESCRIBE
369+ // This can happen if sample query returned different properties than actual query
370+ if (DEBUG )
371+ LogManager .instance ().log (this , Level .WARNING ,
372+ "PSQL: Column count mismatch - RowDesc=%d, DataRow=%d (thread=%s)" ,
373+ portal .columns .size (), dataRowColumns .size (), Thread .currentThread ().threadId ());
374+ }
339375
340- writeDataRows (portal .cachedResultSet , portal .columns );
376+ // Use the columns that were sent in RowDescription for consistency
377+ final Map <String , PostgresType > columnsToUse = portal .columns != null ? portal .columns : dataRowColumns ;
378+ writeDataRows (portal .cachedResultSet , columnsToUse );
341379 writeCommandComplete (portal .query , portal .cachedResultSet == null ? 0 : portal .cachedResultSet .size ());
342380 } else
343381 writeNoData ();
344382 }
345383 } catch (final CommandParsingException e ) {
346384 setErrorInTx ();
347- writeError (ERROR_SEVERITY .ERROR , "Syntax error on executing query: " + e .getCause (). getMessage (), "42601" );
385+ writeError (ERROR_SEVERITY .ERROR , "Syntax error on executing query: " + ( e .getCause () != null ? e . getCause (). getMessage () : e . getMessage () ), "42601" );
348386 } catch (final Exception e ) {
349387 setErrorInTx ();
350388 writeError (ERROR_SEVERITY .ERROR , "Error on executing query: " + e .getMessage (), "XX000" );
@@ -402,7 +440,7 @@ else if (query.query.equalsIgnoreCase("BEGIN") ||
402440
403441 } catch (final CommandParsingException e ) {
404442 setErrorInTx ();
405- writeError (ERROR_SEVERITY .ERROR , "Syntax error on executing query: " + e .getCause (). getMessage (), "42601" );
443+ writeError (ERROR_SEVERITY .ERROR , "Syntax error on executing query: " + ( e .getCause () != null ? e . getCause (). getMessage () : e . getMessage () ), "42601" );
406444 } catch (final Exception e ) {
407445 setErrorInTx ();
408446 writeError (ERROR_SEVERITY .ERROR , "Error on executing query: " + e .getMessage (), "XX000" );
@@ -504,6 +542,87 @@ private Map<String, PostgresType> getColumns(final List<Result> resultSet) {
504542 return columns ;
505543 }
506544
545+ /**
546+ * Extract column schema from a SELECT query by parsing the type name and querying for a sample row.
547+ * This is used during DESCRIBE Statement to return RowDescription before the query is executed.
548+ * ArcadeDB is schema-less so we need to query actual data to discover dynamically-added properties.
549+ */
550+ private Map <String , PostgresType > getColumnsFromQuerySchema (final String query ) {
551+ if (query == null || query .isEmpty ()) {
552+ return null ;
553+ }
554+
555+ // Try to extract the type name from the query
556+ // Patterns: "SELECT FROM TypeName", "SELECT * FROM TypeName", "SELECT ... FROM TypeName"
557+ final String upperQuery = query .toUpperCase ();
558+ final int fromIndex = upperQuery .indexOf (" FROM " );
559+ if (fromIndex < 0 ) {
560+ return null ;
561+ }
562+
563+ String afterFrom = query .substring (fromIndex + 6 ).trim ();
564+
565+ // Extract type name (ends at WHERE, LIMIT, ORDER, or end of string)
566+ String typeName = afterFrom ;
567+ for (String terminator : new String []{" WHERE " , " LIMIT " , " ORDER " , " GROUP " , ";" }) {
568+ final int idx = typeName .toUpperCase ().indexOf (terminator );
569+ if (idx > 0 ) {
570+ typeName = typeName .substring (0 , idx );
571+ }
572+ }
573+ typeName = typeName .trim ();
574+
575+ // Skip schema: prefix if present
576+ if (typeName .toLowerCase ().startsWith ("schema:" )) {
577+ return null ; // Schema queries have different structure
578+ }
579+
580+ try {
581+ // First verify the type exists
582+ final DocumentType docType = database .getSchema ().getType (typeName );
583+ if (docType == null ) {
584+ return null ;
585+ }
586+
587+ // Query for a sample row to discover all properties (including dynamically-added ones)
588+ // Use LIMIT 1 to minimize overhead
589+ final String sampleQuery = "SELECT FROM " + typeName + " LIMIT 1" ;
590+ final ResultSet resultSet = database .query ("sql" , sampleQuery , server .getConfiguration ());
591+ final List <Result > sampleRows = browseAndCacheResultSet (resultSet , 1 );
592+
593+ if (!sampleRows .isEmpty ()) {
594+ // Use the sample row to discover columns
595+ return getColumns (sampleRows );
596+ }
597+
598+ // If no rows exist, fall back to schema-defined properties
599+ final Map <String , PostgresType > columns = new LinkedHashMap <>();
600+
601+ // Add system properties first (these are returned for document/vertex types)
602+ columns .put (RID_PROPERTY , PostgresType .VARCHAR );
603+ columns .put (TYPE_PROPERTY , PostgresType .VARCHAR );
604+ columns .put (CAT_PROPERTY , PostgresType .CHAR );
605+
606+ // Add all defined properties from the type
607+ for (final String propName : docType .getPropertyNames ()) {
608+ final com .arcadedb .schema .Property prop = docType .getProperty (propName );
609+ if (prop != null && prop .getType () != null ) {
610+ columns .put (propName , PostgresType .getTypeFromArcade (prop .getType ()));
611+ } else {
612+ columns .put (propName , PostgresType .VARCHAR );
613+ }
614+ }
615+
616+ return columns ;
617+
618+ } catch (Exception e ) {
619+ if (DEBUG )
620+ LogManager .instance ().log (this , Level .WARNING , "PSQL: failed to get columns from schema for query '%s': %s" ,
621+ query , e .getMessage ());
622+ return null ;
623+ }
624+ }
625+
507626 private void writeRowDescription (final Map <String , PostgresType > columns ) {
508627 if (columns == null )
509628 return ;
@@ -618,7 +737,13 @@ private void bindCommand() {
618737 final String portalName = readString ();
619738 final String sourcePreparedStatement = readString ();
620739
621- final PostgresPortal portal = getPortal (portalName , false );
740+ // Look up the prepared statement (stored during PARSE)
741+ // The portal name may be different (often empty for unnamed portal)
742+ PostgresPortal portal = getPortal (sourcePreparedStatement , false );
743+ if (portal == null ) {
744+ // Try with portal name as fallback for backwards compatibility
745+ portal = getPortal (portalName , false );
746+ }
622747 if (portal == null ) {
623748 writeMessage ("bind complete" , null , '2' , 4 );
624749 return ;
@@ -630,6 +755,9 @@ private void bindCommand() {
630755 Thread .currentThread ().threadId ());
631756
632757 final int paramFormatCount = channel .readShort ();
758+ if (DEBUG )
759+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind paramFormatCount=%d (thread=%s)" ,
760+ paramFormatCount , Thread .currentThread ().threadId ());
633761 if (paramFormatCount > 0 ) {
634762 portal .parameterFormats = new ArrayList <>(paramFormatCount );
635763 for (int i = 0 ; i < paramFormatCount ; i ++) {
@@ -639,12 +767,21 @@ private void bindCommand() {
639767 }
640768
641769 final int paramValuesCount = channel .readShort ();
770+ if (DEBUG )
771+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind paramValuesCount=%d (thread=%s)" ,
772+ paramValuesCount , Thread .currentThread ().threadId ());
642773 if (paramValuesCount > 0 ) {
643774 portal .parameterValues = new ArrayList <>(paramValuesCount );
644775 for (int i = 0 ; i < paramValuesCount ; i ++) {
776+ if (DEBUG )
777+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind reading param %d size (thread=%s)" , i , Thread .currentThread ().threadId ());
645778 final long paramSize = channel .readUnsignedInt ();
779+ if (DEBUG )
780+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind param %d size=%d (thread=%s)" , i , paramSize , Thread .currentThread ().threadId ());
646781 final byte [] paramValue = new byte [(int ) paramSize ];
647782 channel .readBytes (paramValue );
783+ if (DEBUG )
784+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind param %d value read (thread=%s)" , i , Thread .currentThread ().threadId ());
648785
649786 // Determine format code according to PostgreSQL protocol:
650787 // - If paramFormatCount == 0: all parameters use text format (0)
@@ -664,10 +801,17 @@ private void bindCommand() {
664801 ? portal .parameterTypes .get (i )
665802 : 0L ; // UNSPECIFIED type
666803
804+ if (DEBUG )
805+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind deserializing param %d typeCode=%d formatCode=%d (thread=%s)" ,
806+ i , typeCode , formatCode , Thread .currentThread ().threadId ());
667807 portal .parameterValues .add (PostgresType .deserialize (typeCode , formatCode , paramValue ));
808+ if (DEBUG )
809+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind param %d deserialized (thread=%s)" , i , Thread .currentThread ().threadId ());
668810 }
669811 }
670812
813+ if (DEBUG )
814+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind reading resultFormatCount (thread=%s)" , Thread .currentThread ().threadId ());
671815 final int resultFormatCount = channel .readShort ();
672816 if (resultFormatCount > 0 ) {
673817 portal .resultFormats = new ArrayList <>(resultFormatCount );
@@ -680,6 +824,16 @@ private void bindCommand() {
680824 if (errorInTransaction )
681825 return ;
682826
827+ // Store the portal under the portal name (which may be empty for unnamed portal)
828+ // This is necessary because EXECUTE looks up portals by portal name, not prepared statement name
829+ // PostgreSQL protocol: PARSE creates "prepared statement", BIND creates "portal" from it
830+ if (!portalName .equals (sourcePreparedStatement )) {
831+ portals .put (portalName , portal );
832+ if (DEBUG )
833+ LogManager .instance ().log (this , Level .INFO , "PSQL: bind stored portal under name '%s' (thread=%s)" ,
834+ portalName , Thread .currentThread ().threadId ());
835+ }
836+
683837 writeMessage ("bind complete" , null , '2' , 4 );
684838
685839 } catch (final Exception e ) {
@@ -704,12 +858,25 @@ private void parseCommand() {
704858 final long param = channel .readUnsignedInt ();
705859 portal .parameterTypes .add (param );
706860 }
861+ } else {
862+ // Client sent paramCount=0 (e.g., asyncpg, node-postgres)
863+ // Detect $N placeholders in the query to determine actual parameter count
864+ final int detectedParams = detectParameterPlaceholders (query .query );
865+ if (detectedParams > 0 ) {
866+ portal .parameterTypes = new ArrayList <>(detectedParams );
867+ for (int i = 0 ; i < detectedParams ; i ++) {
868+ // Use VARCHAR (OID 1043) as default type instead of 0 (unspecified)
869+ // This prevents asyncpg from trying to introspect unknown types via pg_type
870+ portal .parameterTypes .add ((long ) PostgresType .VARCHAR .code );
871+ }
872+ }
707873 }
708874
875+ final int actualParamCount = portal .parameterTypes != null ? portal .parameterTypes .size () : 0 ;
709876 if (DEBUG )
710877 LogManager .instance ()
711- .log (this , Level .INFO , "PSQL: parse (portal=%s) -> %s (params=%d) (errorInTransaction=%s thread=%s)" , portalName ,
712- portal .query , paramCount , errorInTransaction , Thread .currentThread ().threadId ());
878+ .log (this , Level .INFO , "PSQL: parse (portal=%s) -> %s (params=%d, detected=%d ) (errorInTransaction=%s thread=%s)" ,
879+ portalName , portal .query , paramCount , actualParamCount , errorInTransaction , Thread .currentThread ().threadId ());
713880
714881 if (errorInTransaction )
715882 return ;
@@ -895,7 +1062,7 @@ private void parseCommand() {
8951062
8961063 } catch (final CommandParsingException e ) {
8971064 setErrorInTx ();
898- writeError (ERROR_SEVERITY .ERROR , "Syntax error on parsing query: " + e .getCause (). getMessage (), "42601" );
1065+ writeError (ERROR_SEVERITY .ERROR , "Syntax error on parsing query: " + ( e .getCause () != null ? e . getCause (). getMessage () : e . getMessage () ), "42601" );
8991066 } catch (final Exception e ) {
9001067 setErrorInTx ();
9011068 writeError (ERROR_SEVERITY .ERROR , "Error on parsing query: " + e .getMessage (), "XX000" );
@@ -1215,6 +1382,43 @@ private void writeNoData() {
12151382 writeMessage ("no data" , null , 'n' , 4 );
12161383 }
12171384
1385+ /**
1386+ * Writes ParameterDescription message ('t') describing the parameters of a prepared statement.
1387+ * This is required by the PostgreSQL extended query protocol for DESCRIBE 'S' (Statement).
1388+ */
1389+ private void writeParameterDescription (final PostgresPortal portal ) {
1390+ final int paramCount = portal .parameterTypes != null ? portal .parameterTypes .size () : 0 ;
1391+ // Message format: 't' + int32 length + int16 param count + int32[] type OIDs
1392+ final int messageLength = 4 + 2 + (paramCount * 4 );
1393+
1394+ writeMessage ("parameter description" , () -> {
1395+ channel .writeShort ((short ) paramCount );
1396+ if (portal .parameterTypes != null ) {
1397+ for (final Long typeOid : portal .parameterTypes ) {
1398+ channel .writeUnsignedInt (typeOid != null ? typeOid .intValue () : 0 ); // 0 = unspecified type
1399+ }
1400+ }
1401+ }, 't' , messageLength );
1402+ }
1403+
1404+ /**
1405+ * Detects $N style parameter placeholders in a query and returns the count.
1406+ * PostgreSQL uses $1, $2, etc. for positional parameters.
1407+ * Returns the highest parameter number found (e.g., "$3" returns 3).
1408+ */
1409+ private int detectParameterPlaceholders (final String query ) {
1410+ int maxParam = 0 ;
1411+ final Pattern pattern = Pattern .compile ("\\ $(\\ d+)" );
1412+ final Matcher matcher = pattern .matcher (query );
1413+ while (matcher .find ()) {
1414+ final int paramNum = Integer .parseInt (matcher .group (1 ));
1415+ if (paramNum > maxParam ) {
1416+ maxParam = paramNum ;
1417+ }
1418+ }
1419+ return maxParam ;
1420+ }
1421+
12181422 private PostgresPortal getPortal (final String name , final boolean remove ) {
12191423 if (remove )
12201424 return portals .remove (name );
0 commit comments